parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-248: Add ParquetWriter.Builder.
Date Thu, 25 Jun 2015 16:40:55 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 5c2ba72f9 -> cb0456274


PARQUET-248: Add ParquetWriter.Builder.

This refactors the builder recently added to parquet-avro so that it can
be used by all object models. The Builder class is abstract and
implementations should extend it.

This changes the API slightly from AvroParquetWriter, renaming
withBlockSize to withRowGroupSize. The Avro builder has not been
released so this isn't a breaking change.

Author: Ryan Blue <blue@apache.org>

Closes #199 from rdblue/PARQUET-248-add-parquet-writer-builder and squashes the following
commits:

a1a25ee [Ryan Blue] PARQUET-248: Add write mode and max padding to writer builder.
622af4c [Ryan Blue] PARQUET-248: Add ParquetWriter.Builder.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/cb045627
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/cb045627
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/cb045627

Branch: refs/heads/master
Commit: cb04562742688f8a444a52c90b2183c4be528be6
Parents: 5c2ba72
Author: Ryan Blue <blue@apache.org>
Authored: Thu Jun 25 09:40:21 2015 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Thu Jun 25 09:40:21 2015 -0700

----------------------------------------------------------------------
 .../apache/parquet/avro/AvroParquetWriter.java  |  71 +----
 .../apache/parquet/hadoop/ParquetWriter.java    | 257 +++++++++++++++++--
 2 files changed, 242 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cb045627/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
index e719afc..d0c0633 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
@@ -145,67 +145,12 @@ public class AvroParquetWriter<T> extends ParquetWriter<T>
{
         new AvroSchemaConverter(conf).convert(avroSchema), avroSchema, model);
   }
 
-  public static class Builder<T> {
-    private final Path file;
-    private Configuration conf = new Configuration();
-    private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
-    private int blockSize = DEFAULT_BLOCK_SIZE;
-    private int pageSize = DEFAULT_PAGE_SIZE;
-    private boolean enableDictionary = DEFAULT_IS_DICTIONARY_ENABLED;
-    private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED;
-    private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;
-
-    // avro-specific
+  public static class Builder<T> extends ParquetWriter.Builder<T, Builder<T>>
{
     private Schema schema = null;
     private GenericData model = SpecificData.get();
 
     private Builder(Path file) {
-      this.file = file;
-    }
-
-    public Builder<T> withConf(Configuration conf) {
-      this.conf = conf;
-      return this;
-    }
-
-    public Builder<T> withCompressionCodec(CompressionCodecName codecName) {
-      this.codecName = codecName;
-      return this;
-    }
-
-    public Builder<T> withBlockSize(int blockSize) {
-      this.blockSize = blockSize;
-      return this;
-    }
-
-    public Builder<T> withPageSize(int pageSize) {
-      this.pageSize = pageSize;
-      return this;
-    }
-
-    public Builder<T> enableDictionaryEncoding() {
-      this.enableDictionary = true;
-      return this;
-    }
-
-    public Builder<T> withDictionaryEncoding(boolean enableDictionary) {
-      this.enableDictionary = enableDictionary;
-      return this;
-    }
-
-    public Builder<T> enableValidation() {
-      this.enableValidation = true;
-      return this;
-    }
-
-    public Builder<T> withValidation(boolean enableValidation) {
-      this.enableValidation = enableValidation;
-      return this;
-    }
-
-    public Builder<T> withWriterVersion(WriterVersion version) {
-      this.writerVersion = version;
-      return this;
+      super(file);
     }
 
     public Builder<T> withSchema(Schema schema) {
@@ -218,14 +163,14 @@ public class AvroParquetWriter<T> extends ParquetWriter<T>
{
       return this;
     }
 
-    private WriteSupport<T> getWriteSupport() {
-      return AvroParquetWriter.<T>writeSupport(conf, schema, model);
+    @Override
+    protected Builder<T> self() {
+      return this;
     }
 
-    public ParquetWriter<T> build() throws IOException {
-      return new AvroParquetWriter<T>(file, getWriteSupport(), codecName,
-          blockSize, pageSize, enableDictionary, enableValidation,
-          writerVersion, conf);
+    @Override
+    protected WriteSupport<T> getWriteSupport(Configuration conf) {
+      return AvroParquetWriter.writeSupport(conf, schema, model);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cb045627/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index c9b9eac..70abdac 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -61,6 +61,7 @@ public class ParquetWriter<T> implements Closeable {
    * @throws IOException
    * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, boolean, boolean)
    */
+  @Deprecated
   public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName
compressionCodecName, int blockSize, int pageSize) throws IOException {
     this(file, writeSupport, compressionCodecName, blockSize, pageSize,
         DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
@@ -79,6 +80,7 @@ public class ParquetWriter<T> implements Closeable {
    * @throws IOException
    * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean,
boolean)
    */
+  @Deprecated
   public ParquetWriter(
       Path file,
       WriteSupport<T> writeSupport,
@@ -104,6 +106,7 @@ public class ParquetWriter<T> implements Closeable {
    * @throws IOException
    * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean,
boolean, WriterVersion)
    */
+  @Deprecated
   public ParquetWriter(
       Path file,
       WriteSupport<T> writeSupport,
@@ -136,6 +139,7 @@ public class ParquetWriter<T> implements Closeable {
    * @throws IOException
    * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean,
boolean, WriterVersion, Configuration)
    */
+  @Deprecated
   public ParquetWriter(
       Path file,
       WriteSupport<T> writeSupport,
@@ -164,6 +168,7 @@ public class ParquetWriter<T> implements Closeable {
    * @param conf Hadoop configuration to use while accessing the filesystem
    * @throws IOException
    */
+  @Deprecated
   public ParquetWriter(
       Path file,
       WriteSupport<T> writeSupport,
@@ -179,6 +184,7 @@ public class ParquetWriter<T> implements Closeable {
         compressionCodecName, blockSize, pageSize, dictionaryPageSize,
         enableDictionary, validating, writerVersion, conf);
   }
+
   /**
    * Create a new ParquetWriter.
    *
@@ -195,6 +201,7 @@ public class ParquetWriter<T> implements Closeable {
    * @param conf Hadoop configuration to use while accessing the filesystem
    * @throws IOException
    */
+  @Deprecated
   public ParquetWriter(
       Path file,
       ParquetFileWriter.Mode mode,
@@ -207,29 +214,9 @@ public class ParquetWriter<T> implements Closeable {
       boolean validating,
       WriterVersion writerVersion,
       Configuration conf) throws IOException {
-
-    WriteSupport.WriteContext writeContext = writeSupport.init(conf);
-    MessageType schema = writeContext.getSchema();
-
-    // TODO: in a follow-up issue, add max padding to the builder
-    ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file,
-        mode, blockSize, MAX_PADDING_SIZE_DEFAULT);
-    fileWriter.start();
-
-    CodecFactory codecFactory = new CodecFactory(conf);
-    CodecFactory.BytesCompressor compressor =	codecFactory.getCompressor(compressionCodecName,
0);
-    this.writer = new InternalParquetRecordWriter<T>(
-        fileWriter,
-        writeSupport,
-        schema,
-        writeContext.getExtraMetaData(),
-        blockSize,
-        pageSize,
-        compressor,
-        dictionaryPageSize,
-        enableDictionary,
-        validating,
-        writerVersion);
+    this(file, mode, writeSupport, compressionCodecName, blockSize, pageSize,
+        dictionaryPageSize, enableDictionary, validating, writerVersion, conf,
+        MAX_PADDING_SIZE_DEFAULT);
   }
 
   /**
@@ -240,10 +227,12 @@ public class ParquetWriter<T> implements Closeable {
    * @param writeSupport the implementation to write a record to a RecordConsumer
    * @throws IOException
    */
+  @Deprecated
   public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException
{
     this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
   }
 
+  @Deprecated
   public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport)
throws IOException {
     this(file,
         writeSupport,
@@ -257,6 +246,43 @@ public class ParquetWriter<T> implements Closeable {
         conf);
   }
 
+  ParquetWriter(
+      Path file,
+      ParquetFileWriter.Mode mode,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      int blockSize,
+      int pageSize,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating,
+      WriterVersion writerVersion,
+      Configuration conf,
+      int maxPaddingSize) throws IOException {
+
+    WriteSupport.WriteContext writeContext = writeSupport.init(conf);
+    MessageType schema = writeContext.getSchema();
+
+    ParquetFileWriter fileWriter = new ParquetFileWriter(
+        conf, schema, file, mode, blockSize, maxPaddingSize);
+    fileWriter.start();
+
+    CodecFactory codecFactory = new CodecFactory(conf);
+    CodecFactory.BytesCompressor compressor =	codecFactory.getCompressor(compressionCodecName,
0);
+    this.writer = new InternalParquetRecordWriter<T>(
+        fileWriter,
+        writeSupport,
+        schema,
+        writeContext.getExtraMetaData(),
+        blockSize,
+        pageSize,
+        compressor,
+        dictionaryPageSize,
+        enableDictionary,
+        validating,
+        writerVersion);
+  }
+
   public void write(T object) throws IOException {
     try {
       writer.write(object);
@@ -273,4 +299,189 @@ public class ParquetWriter<T> implements Closeable {
       throw new IOException(e);
     }
   }
+
+  /**
+   * An abstract builder class for ParquetWriter instances.
+   *
+   * Object models should extend this builder to provide writer configuration
+   * options.
+   *
+   * @param <T> The type of objects written by the constructed ParquetWriter.
+   * @param <SELF> The type of this builder that is returned by builder methods
+   */
+  public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
+    private final Path file;
+    private Configuration conf = new Configuration();
+    private ParquetFileWriter.Mode mode;
+    private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
+    private int rowGroupSize = DEFAULT_BLOCK_SIZE;
+    private int pageSize = DEFAULT_PAGE_SIZE;
+    private int dictionaryPageSize = DEFAULT_PAGE_SIZE;
+    private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT;
+    private boolean enableDictionary = DEFAULT_IS_DICTIONARY_ENABLED;
+    private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED;
+    private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;
+
+    protected Builder(Path file) {
+      this.file = file;
+    }
+
+    /**
+     * @return this as the correct subclass of ParquetWriter.Builder.
+     */
+    protected abstract SELF self();
+
+    /**
+     * @return an appropriate WriteSupport for the object model.
+     */
+    protected abstract WriteSupport<T> getWriteSupport(Configuration conf);
+
+    /**
+     * Set the {@link Configuration} used by the constructed writer.
+     *
+     * @param conf a {@code Configuration}
+     * @return this builder for method chaining.
+     */
+    public SELF withConf(Configuration conf) {
+      this.conf = conf;
+      return self();
+    }
+
+    /**
+     * Set the {@link ParquetFileWriter.Mode write mode} used when creating the
+     * backing file for this writer.
+     *
+     * @param mode a {@code ParquetFileWriter.Mode}
+     * @return this builder for method chaining.
+     */
+    public SELF withWriteMode(ParquetFileWriter.Mode mode) {
+      this.mode = mode;
+      return self();
+    }
+
+    /**
+     * Set the {@link CompressionCodecName compression codec} used by the
+     * constructed writer.
+     *
+     * @param codecName a {@code CompressionCodecName}
+     * @return this builder for method chaining.
+     */
+    public SELF withCompressionCodec(CompressionCodecName codecName) {
+      this.codecName = codecName;
+      return self();
+    }
+
+    /**
+     * Set the Parquet format row group size used by the constructed writer.
+     *
+     * @param rowGroupSize an integer size in bytes
+     * @return this builder for method chaining.
+     */
+    public SELF withRowGroupSize(int rowGroupSize) {
+      this.rowGroupSize = rowGroupSize;
+      return self();
+    }
+
+    /**
+     * Set the Parquet format page size used by the constructed writer.
+     *
+     * @param pageSize an integer size in bytes
+     * @return this builder for method chaining.
+     */
+    public SELF withPageSize(int pageSize) {
+      this.pageSize = pageSize;
+      return self();
+    }
+
+    /**
+     * Set the Parquet format dictionary page size used by the constructed
+     * writer.
+     *
+     * @param dictionaryPageSize an integer size in bytes
+     * @return this builder for method chaining.
+     */
+    public SELF withDictionaryPageSize(int dictionaryPageSize) {
+      this.dictionaryPageSize = dictionaryPageSize;
+      return self();
+    }
+
+    /**
+     * Set the maximum amount of padding, in bytes, that will be used to align
+     * row groups with blocks in the underlying filesystem. If the underlying
+     * filesystem is not a block filesystem like HDFS, this has no effect.
+     *
+     * @param maxPaddingSize an integer size in bytes
+     * @return this builder for method chaining.
+     */
+    public SELF withMaxPaddingSize(int maxPaddingSize) {
+      this.maxPaddingSize = maxPaddingSize;
+      return self();
+    }
+
+    /**
+     * Enables dictionary encoding for the constructed writer.
+     *
+     * @return this builder for method chaining.
+     */
+    public SELF enableDictionaryEncoding() {
+      this.enableDictionary = true;
+      return self();
+    }
+
+    /**
+     * Enable or disable dictionary encoding for the constructed writer.
+     *
+     * @param enableDictionary whether dictionary encoding should be enabled
+     * @return this builder for method chaining.
+     */
+    public SELF withDictionaryEncoding(boolean enableDictionary) {
+      this.enableDictionary = enableDictionary;
+      return self();
+    }
+
+    /**
+     * Enables validation for the constructed writer.
+     *
+     * @return this builder for method chaining.
+     */
+    public SELF enableValidation() {
+      this.enableValidation = true;
+      return self();
+    }
+
+    /**
+     * Enable or disable validation for the constructed writer.
+     *
+     * @param enableValidation whether validation should be enabled
+     * @return this builder for method chaining.
+     */
+    public SELF withValidation(boolean enableValidation) {
+      this.enableValidation = enableValidation;
+      return self();
+    }
+
+    /**
+     * Set the {@link WriterVersion format version} used by the constructed
+     * writer.
+     *
+     * @param version a {@code WriterVersion}
+     * @return this builder for method chaining.
+     */
+    public SELF withWriterVersion(WriterVersion version) {
+      this.writerVersion = version;
+      return self();
+    }
+
+    /**
+     * Build a {@link ParquetWriter} with the accumulated configuration.
+     *
+     * @return a configured {@code ParquetWriter} instance.
+     * @throws IOException
+     */
+    public ParquetWriter<T> build() throws IOException {
+      return new ParquetWriter<T>(file, mode, getWriteSupport(conf), codecName,
+          rowGroupSize, pageSize, dictionaryPageSize, enableDictionary,
+          enableValidation, writerVersion, conf, maxPaddingSize);
+    }
+  }
 }


Mime
View raw message