parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alexleven...@apache.org
Subject parquet-mr git commit: PARQUET-381: Add feature to merge metadata (summary) files, and control which files are generated
Date Tue, 13 Oct 2015 22:54:11 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master c3819688c -> b1ea059a6


PARQUET-381: Add feature to merge metadata (summary) files, and control which files are generated

1) Add helper to merge 2 summary files, useful for merging 2 directories of data into 1
2) Add more control over whether _common_metadata, _metadata, or both is written

Author: Alex Levenson <alexlevenson@twitter.com>

Closes #277 from isnotinvain/alexlevenson/merge-summary-files and squashes the following commits:

86232f5 [Alex Levenson] Address comments
96b9495 [Alex Levenson] Fix null extraMetaData
099c913 [Alex Levenson] Make deprecated method delegate to new method
7a98957 [Alex Levenson] Merge branch 'master' into alexlevenson/merge-summary-files
ddaf4ff [Alex Levenson] Introduce job summary levels for controlling which metadata files
are generated
87a2ebc [Alex Levenson] Update comments
9d2b8da [Alex Levenson] Add helper method for merging metadata files


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/b1ea059a
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/b1ea059a
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/b1ea059a

Branch: refs/heads/master
Commit: b1ea059a66c7d6d6bb4cb53d2005a9b7bb599ada
Parents: c381968
Author: Alex Levenson <alexlevenson@twitter.com>
Authored: Tue Oct 13 15:54:03 2015 -0700
Committer: Alex Levenson <alexlevenson@twitter.com>
Committed: Tue Oct 13 15:54:03 2015 -0700

----------------------------------------------------------------------
 .../parquet/hadoop/ParquetFileReader.java       |  13 +-
 .../parquet/hadoop/ParquetFileWriter.java       |  65 +++++-
 .../parquet/hadoop/ParquetOutputCommitter.java  |  66 ++++--
 .../parquet/hadoop/ParquetOutputFormat.java     |  51 ++++-
 .../hadoop/example/ExampleParquetWriter.java    |  11 +-
 .../hadoop/example/GroupWriteSupport.java       |  12 +-
 .../parquet/hadoop/TestMergeMetadataFiles.java  | 215 +++++++++++++++++++
 .../parquet/hadoop/TestParquetFileWriter.java   |   5 +-
 .../TestParquetOutputFormatJobSummaryLevel.java |  69 ++++++
 9 files changed, 478 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index ea7a672..f43e692 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -252,6 +252,15 @@ public class ParquetFileReader implements Closeable {
   /**
    * Read the footers of all the files under that path (recursively)
    * not using summary files.
+   */
+  public static List<Footer> readAllFootersInParallel(Configuration configuration,
FileStatus fileStatus, boolean skipRowGroups) throws IOException {
+    List<FileStatus> statuses = listFiles(configuration, fileStatus);
+    return readAllFootersInParallel(configuration, statuses, skipRowGroups);
+  }
+
+  /**
+   * Read the footers of all the files under that path (recursively)
+   * not using summary files.
    * rowGroups are not skipped
    * @param configuration the configuration to access the FS
    * @param fileStatus the root dir
@@ -259,10 +268,10 @@ public class ParquetFileReader implements Closeable {
    * @throws IOException
    */
   public static List<Footer> readAllFootersInParallel(Configuration configuration,
FileStatus fileStatus) throws IOException {
-    List<FileStatus> statuses = listFiles(configuration, fileStatus);
-    return readAllFootersInParallel(configuration, statuses, false);
+    return readAllFootersInParallel(configuration, fileStatus, false);
   }
 
+
   @Deprecated
   public static List<Footer> readFooters(Configuration configuration, Path path) throws
IOException {
     return readFooters(configuration, status(configuration, path));

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index d6d8369..664ee9d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.parquet.Log;
+import org.apache.parquet.Preconditions;
 import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
@@ -46,6 +47,7 @@ import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -474,30 +476,83 @@ public class ParquetFileWriter {
     org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION,
footer);
     writeFileMetaData(parquetMetadata, out);
     if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
-    BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
+    BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
     out.write(MAGIC);
   }
 
   /**
+   * Given a list of metadata files, merge them into a single ParquetMetadata
+   * Requires that the schemas be compatible, and the extraMetadata be exactly equal.
+   */
+  public static ParquetMetadata mergeMetadataFiles(List<Path> files,  Configuration
conf) throws IOException {
+    Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list of metadata");
+
+    GlobalMetaData globalMetaData = null;
+    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+
+    for (Path p : files) {
+      ParquetMetadata pmd = ParquetFileReader.readFooter(conf, p, ParquetMetadataConverter.NO_FILTER);
+      FileMetaData fmd = pmd.getFileMetaData();
+      globalMetaData = mergeInto(fmd, globalMetaData, true);
+      blocks.addAll(pmd.getBlocks());
+    }
+
+    // collapse GlobalMetaData into a single FileMetaData, which will throw if they are not
compatible
+    return new ParquetMetadata(globalMetaData.merge(), blocks);
+  }
+
+  /**
+   * Given a list of metadata files, merge them into a single metadata file.
+   * Requires that the schemas be compatible, and the extraMetaData be exactly equal.
+   * This is useful when merging 2 directories of parquet files into a single directory,
as long
+   * as both directories were written with compatible schemas and equal extraMetaData.
+   */
+  public static void writeMergedMetadataFile(List<Path> files, Path outputPath, Configuration
conf) throws IOException {
+    ParquetMetadata merged = mergeMetadataFiles(files, conf);
+    writeMetadataFile(outputPath, merged, outputPath.getFileSystem(conf));
+  }
+
+  /**
    * writes a _metadata and _common_metadata file
    * @param configuration the configuration to use to get the FileSystem
    * @param outputPath the directory to write the _metadata file to
    * @param footers the list of footers to merge
+   * @deprecated use the variant of writeMetadataFile that takes a {@link JobSummaryLevel}
as an argument.
    * @throws IOException
    */
+  @Deprecated
   public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer>
footers) throws IOException {
+    writeMetadataFile(configuration, outputPath, footers, JobSummaryLevel.ALL);
+  }
+
+  /**
+   * writes _common_metadata file, and optionally a _metadata file depending on the {@link
JobSummaryLevel} provided
+   */
+  public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer>
footers, JobSummaryLevel level) throws IOException {
+    Preconditions.checkArgument(level == JobSummaryLevel.ALL || level == JobSummaryLevel.COMMON_ONLY,
+        "Unsupported level: " + level);
+
     FileSystem fs = outputPath.getFileSystem(configuration);
     outputPath = outputPath.makeQualified(fs);
     ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
-    writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
+
+    if (level == JobSummaryLevel.ALL) {
+      writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
+    }
+
     metadataFooter.getBlocks().clear();
     writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
   }
 
-  private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter,
FileSystem fs, String parquetMetadataFile)
+  private static void writeMetadataFile(Path outputPathRoot, ParquetMetadata metadataFooter,
FileSystem fs, String parquetMetadataFile)
+      throws IOException {
+    Path metaDataPath = new Path(outputPathRoot, parquetMetadataFile);
+    writeMetadataFile(metaDataPath, metadataFooter, fs);
+  }
+
+  private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter,
FileSystem fs)
       throws IOException {
-    Path metaDataPath = new Path(outputPath, parquetMetadataFile);
-    FSDataOutputStream metadata = fs.create(metaDataPath);
+    FSDataOutputStream metadata = fs.create(outputPath);
     metadata.write(MAGIC);
     serializeFooter(metadataFooter, metadata);
     metadata.close();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java
index 9a0930a..45455ef 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 
 import org.apache.parquet.Log;
+import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.apache.parquet.hadoop.util.ContextUtil;
 
 public class ParquetOutputCommitter extends FileOutputCommitter {
@@ -48,30 +49,63 @@ public class ParquetOutputCommitter extends FileOutputCommitter {
     writeMetaDataFile(configuration,outputPath);
   }
 
+  // TODO: This method should propagate errors, and we should clean up
+  // TODO: all the catching of Exceptions below -- see PARQUET-383
   public static void writeMetaDataFile(Configuration configuration, Path outputPath) {
-    if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
+    JobSummaryLevel level = ParquetOutputFormat.getJobSummaryLevel(configuration);
+    if (level == JobSummaryLevel.NONE) {
+      return;
+    }
+
+    try {
+      final FileSystem fileSystem = outputPath.getFileSystem(configuration);
+      FileStatus outputStatus = fileSystem.getFileStatus(outputPath);
+      List<Footer> footers;
+
+      switch (level) {
+        case ALL:
+          footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus,
false); // don't skip row groups
+          break;
+        case COMMON_ONLY:
+          footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus,
true); // skip row groups
+          break;
+        default:
+          throw new IllegalArgumentException("Unrecognized job summary level: " + level);
+      }
+
+      // If there are no footers, _metadata file cannot be written since there is no way
to determine schema!
+      // Onus of writing any summary files lies with the caller in this case.
+      if (footers.isEmpty()) {
+        return;
+      }
+
       try {
-        final FileSystem fileSystem = outputPath.getFileSystem(configuration);
-        FileStatus outputStatus = fileSystem.getFileStatus(outputPath);
-        List<Footer> footers = ParquetFileReader.readAllFootersInParallel(configuration,
outputStatus);
-        // If there are no footers, _metadata file cannot be written since there is no way
to determine schema!
-        // Onus of writing any summary files lies with the caller in this case.
-        if (footers.isEmpty()) {
-          return;
-        }
+        ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers, level);
+      } catch (Exception e) {
+        LOG.warn("could not write summary file(s) for " + outputPath, e);
+
+        final Path metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE);
+
         try {
-          ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers);
-        } catch (Exception e) {
-          LOG.warn("could not write summary file for " + outputPath, e);
-          final Path metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE);
           if (fileSystem.exists(metadataPath)) {
             fileSystem.delete(metadataPath, true);
           }
+        } catch (Exception e2) {
+          LOG.warn("could not delete metadata file" + outputPath, e2);
         }
-      } catch (Exception e) {
-        LOG.warn("could not write summary file for " + outputPath, e);
+
+        try {
+          final Path commonMetadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE);
+          if (fileSystem.exists(commonMetadataPath)) {
+            fileSystem.delete(commonMetadataPath, true);
+          }
+        } catch (Exception e2) {
+          LOG.warn("could not delete metadata file" + outputPath, e2);
+        }
+
       }
+    } catch (Exception e) {
+      LOG.warn("could not write summary file for " + outputPath, e);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index e075db3..ad6c034 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -103,6 +103,33 @@ import org.apache.parquet.hadoop.util.ConfigurationUtil;
 public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   private static final Log LOG = Log.getLog(ParquetOutputFormat.class);
 
+  public static enum JobSummaryLevel {
+    /**
+     * Write no summary files
+     */
+    NONE,
+    /**
+     * Write both summary file with row group info and summary file without
+     * (both _metadata and _common_metadata)
+     */
+    ALL,
+    /**
+     * Write only the summary file without the row group info
+     * (_common_metadata only)
+     */
+    COMMON_ONLY
+  }
+
+  /**
+   * An alias for JOB_SUMMARY_LEVEL, where true means ALL and false means NONE
+   */
+  @Deprecated
+  public static final String ENABLE_JOB_SUMMARY   = "parquet.enable.summary-metadata";
+
+  /**
+   * Must be one of the values in {@link JobSummaryLevel} (case insensitive)
+   */
+  public static final String JOB_SUMMARY_LEVEL = "parquet.summary.metadata.level";
   public static final String BLOCK_SIZE           = "parquet.block.size";
   public static final String PAGE_SIZE            = "parquet.page.size";
   public static final String COMPRESSION          = "parquet.compression";
@@ -111,7 +138,6 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
   public static final String ENABLE_DICTIONARY    = "parquet.enable.dictionary";
   public static final String VALIDATION           = "parquet.validation";
   public static final String WRITER_VERSION       = "parquet.writer.version";
-  public static final String ENABLE_JOB_SUMMARY   = "parquet.enable.summary-metadata";
   public static final String MEMORY_POOL_RATIO    = "parquet.memory.pool.ratio";
   public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size";
   public static final String MAX_PADDING_BYTES    = "parquet.writer.max-padding";
@@ -119,6 +145,29 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
   // default to no padding for now
   private static final int DEFAULT_MAX_PADDING_SIZE = 0;
 
+  public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
+    String level = conf.get(JOB_SUMMARY_LEVEL);
+    String deprecatedFlag = conf.get(ENABLE_JOB_SUMMARY);
+
+    if (deprecatedFlag != null) {
+      LOG.warn("Setting " + ENABLE_JOB_SUMMARY + " is deprecated, please use " + JOB_SUMMARY_LEVEL);
+    }
+
+    if (level != null && deprecatedFlag != null) {
+      LOG.warn("Both " + JOB_SUMMARY_LEVEL + " and " + ENABLE_JOB_SUMMARY + " are set! "
+ ENABLE_JOB_SUMMARY + " will be ignored.");
+    }
+
+    if (level != null) {
+      return JobSummaryLevel.valueOf(level.toUpperCase());
+    }
+
+    if (deprecatedFlag != null) {
+      return Boolean.valueOf(deprecatedFlag) ? JobSummaryLevel.ALL : JobSummaryLevel.NONE;
+    }
+
+    return JobSummaryLevel.ALL;
+  }
+
   public static void setWriteSupportClass(Job job,  Class<?> writeSupportClass) {
     getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
index c63be91..88879c2 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
@@ -27,6 +27,8 @@ import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.MessageType;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * An example file writer class.
@@ -70,6 +72,7 @@ public class ExampleParquetWriter extends ParquetWriter<Group> {
 
   public static class Builder extends ParquetWriter.Builder<Group, Builder> {
     private MessageType type = null;
+    private Map<String, String> extraMetaData = new HashMap<String, String>();
 
     private Builder(Path file) {
       super(file);
@@ -80,6 +83,11 @@ public class ExampleParquetWriter extends ParquetWriter<Group> {
       return this;
     }
 
+    public Builder withExtraMetaData(Map<String, String> extraMetaData) {
+      this.extraMetaData = extraMetaData;
+      return this;
+    }
+
     @Override
     protected Builder self() {
       return this;
@@ -87,7 +95,8 @@ public class ExampleParquetWriter extends ParquetWriter<Group> {
 
     @Override
     protected WriteSupport<Group> getWriteSupport(Configuration conf) {
-      return new GroupWriteSupport(type);
+      return new GroupWriteSupport(type, extraMetaData);
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
index 25f8fe5..ee59a6e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
@@ -22,6 +22,7 @@ import static org.apache.parquet.Preconditions.checkNotNull;
 import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
 
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -45,14 +46,21 @@ public class GroupWriteSupport extends WriteSupport<Group> {
     return parseMessageType(checkNotNull(configuration.get(PARQUET_EXAMPLE_SCHEMA), PARQUET_EXAMPLE_SCHEMA));
   }
 
-  private MessageType schema = null;
+  private MessageType schema;
   private GroupWriter groupWriter;
+  private Map<String, String> extraMetaData;
 
   public GroupWriteSupport() {
+    this(null, new HashMap<String, String>());
   }
 
   GroupWriteSupport(MessageType schema) {
+    this(schema, new HashMap<String, String>());
+  }
+
+  GroupWriteSupport(MessageType schema, Map<String, String> extraMetaData) {
     this.schema = schema;
+    this.extraMetaData = extraMetaData;
   }
 
   @Override
@@ -61,7 +69,7 @@ public class GroupWriteSupport extends WriteSupport<Group> {
     if (schema == null) {
       schema = getSchema(configuration);
     }
-    return new WriteContext(schema, new HashMap<String, String>());
+    return new WriteContext(schema, this.extraMetaData);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java
new file mode 100644
index 0000000..6f86062
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestMergeMetadataFiles {
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private static final MessageType schema = parseMessageType(
+      "message test { "
+          + "required binary binary_field; "
+          + "required int32 int32_field; "
+          + "required int64 int64_field; "
+          + "required boolean boolean_field; "
+          + "required float float_field; "
+          + "required double double_field; "
+          + "required fixed_len_byte_array(3) flba_field; "
+          + "required int96 int96_field; "
+          + "} ");
+
+  // schema1 with a field removed
+  private static final MessageType schema2 = parseMessageType(
+      "message test { "
+          + "required binary binary_field; "
+          + "required int32 int32_field; "
+          + "required int64 int64_field; "
+          + "required boolean boolean_field; "
+          + "required float float_field; "
+          + "required double double_field; "
+          + "required fixed_len_byte_array(3) flba_field; "
+          + "} ");
+
+  private static void writeFile(File out, Configuration conf, boolean useSchema2) throws
IOException {
+    if (!useSchema2) {
+      GroupWriteSupport.setSchema(schema, conf);
+    } else {
+      GroupWriteSupport.setSchema(schema2, conf);
+    }
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+
+    Map<String, String> extraMetaData = new HashMap<String, String>();
+    extraMetaData.put("schema_num", useSchema2 ? "2" : "1" );
+
+    ParquetWriter<Group> writer = ExampleParquetWriter
+        .builder(new Path(out.getAbsolutePath()))
+        .withConf(conf)
+        .withExtraMetaData(extraMetaData)
+        .build();
+
+      for (int i = 0; i < 1000; i++) {
+        Group g = f.newGroup()
+            .append("binary_field", "test" + i)
+            .append("int32_field", i)
+            .append("int64_field", (long) i)
+            .append("boolean_field", i % 2 == 0)
+            .append("float_field", (float) i)
+            .append("double_field", (double)i)
+            .append("flba_field", "foo");
+
+        if (!useSchema2) {
+          g = g.append("int96_field", Binary.fromConstantByteArray(new byte[12]));
+        }
+
+        writer.write(g);
+      }
+      writer.close();
+  }
+
+  private static class WrittenFileInfo {
+    public Configuration conf;
+    public Path metaPath1;
+    public Path metaPath2;
+    public Path commonMetaPath1;
+    public Path commonMetaPath2;
+  }
+
+  private WrittenFileInfo writeFiles(boolean mixedSchemas) throws Exception {
+    WrittenFileInfo info = new WrittenFileInfo();
+    Configuration conf = new Configuration();
+    info.conf = conf;
+
+    File root1 = new File(temp.getRoot(), "out1");
+    File root2 = new File(temp.getRoot(), "out2");
+    Path rootPath1 = new Path(root1.getAbsolutePath());
+    Path rootPath2 = new Path(root2.getAbsolutePath());
+
+    for (int i = 0; i < 10; i++) {
+      writeFile(new File(root1, i + ".parquet"), conf, true);
+    }
+
+    List<Footer> footers = ParquetFileReader.readFooters(conf, rootPath1.getFileSystem(conf).getFileStatus(rootPath1),
false);
+    ParquetFileWriter.writeMetadataFile(conf, rootPath1, footers, JobSummaryLevel.ALL);
+
+    for (int i = 0; i < 7; i++) {
+      writeFile(new File(root2, i + ".parquet"), conf, !mixedSchemas);
+    }
+
+    footers = ParquetFileReader.readFooters(conf, rootPath2.getFileSystem(conf).getFileStatus(rootPath2),
false);
+    ParquetFileWriter.writeMetadataFile(conf, rootPath2, footers, JobSummaryLevel.ALL);
+
+    info.commonMetaPath1 = new Path(new File(root1, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE).getAbsolutePath());
+    info.commonMetaPath2 = new Path(new File(root2, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE).getAbsolutePath());
+    info.metaPath1 = new Path(new File(root1, ParquetFileWriter.PARQUET_METADATA_FILE).getAbsolutePath());
+    info.metaPath2 = new Path(new File(root2, ParquetFileWriter.PARQUET_METADATA_FILE).getAbsolutePath());
+
+    return info;
+  }
+
+  @Test
+  public void testMergeMetadataFiles() throws Exception {
+    WrittenFileInfo info = writeFiles(false);
+
+    ParquetMetadata commonMeta1 = ParquetFileReader.readFooter(info.conf, info.commonMetaPath1,
ParquetMetadataConverter.NO_FILTER);
+    ParquetMetadata commonMeta2 = ParquetFileReader.readFooter(info.conf, info.commonMetaPath2,
ParquetMetadataConverter.NO_FILTER);
+    ParquetMetadata meta1 = ParquetFileReader.readFooter(info.conf, info.metaPath1, ParquetMetadataConverter.NO_FILTER);
+    ParquetMetadata meta2 = ParquetFileReader.readFooter(info.conf, info.metaPath2, ParquetMetadataConverter.NO_FILTER);
+
+    assertTrue(commonMeta1.getBlocks().isEmpty());
+    assertTrue(commonMeta2.getBlocks().isEmpty());
+    assertEquals(commonMeta1.getFileMetaData().getSchema(), commonMeta2.getFileMetaData().getSchema());
+
+    assertFalse(meta1.getBlocks().isEmpty());
+    assertFalse(meta2.getBlocks().isEmpty());
+    assertEquals(meta1.getFileMetaData().getSchema(), meta2.getFileMetaData().getSchema());
+
+
+    assertEquals(commonMeta1.getFileMetaData().getKeyValueMetaData(), commonMeta2.getFileMetaData().getKeyValueMetaData());
+    assertEquals(meta1.getFileMetaData().getKeyValueMetaData(), meta2.getFileMetaData().getKeyValueMetaData());
+
+    // test file serialization
+    Path mergedOut = new Path(new File(temp.getRoot(), "merged_meta").getAbsolutePath());
+    Path mergedCommonOut = new Path(new File(temp.getRoot(), "merged_common_meta").getAbsolutePath());
+    ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.metaPath1, info.metaPath2),
mergedOut, info.conf);
+    ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.commonMetaPath1, info.commonMetaPath2),
mergedCommonOut, info.conf);
+
+    ParquetMetadata mergedMeta = ParquetFileReader.readFooter(info.conf, mergedOut, ParquetMetadataConverter.NO_FILTER);
+    ParquetMetadata mergedCommonMeta = ParquetFileReader.readFooter(info.conf, mergedCommonOut,
ParquetMetadataConverter.NO_FILTER);
+
+    // ideally we'd assert equality here, but BlockMetaData and it's references don't implement
equals
+    assertEquals(meta1.getBlocks().size() + meta2.getBlocks().size(), mergedMeta.getBlocks().size());
+    assertTrue(mergedCommonMeta.getBlocks().isEmpty());
+
+    assertEquals(meta1.getFileMetaData().getSchema(), mergedMeta.getFileMetaData().getSchema());
+    assertEquals(commonMeta1.getFileMetaData().getSchema(), mergedCommonMeta.getFileMetaData().getSchema());
+
+    assertEquals(meta1.getFileMetaData().getKeyValueMetaData(), mergedMeta.getFileMetaData().getKeyValueMetaData());
+    assertEquals(commonMeta1.getFileMetaData().getKeyValueMetaData(), mergedCommonMeta.getFileMetaData().getKeyValueMetaData());
+  }
+
+  @Test
+  public void testThrowsWhenIncompatible() throws Exception {
+    WrittenFileInfo info = writeFiles(true);
+
+    Path mergedOut = new Path(new File(temp.getRoot(), "merged_meta").getAbsolutePath());
+    Path mergedCommonOut = new Path(new File(temp.getRoot(), "merged_common_meta").getAbsolutePath());
+
+    try {
+      ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.metaPath1, info.metaPath2),
mergedOut, info.conf);
+      fail("this should throw");
+    } catch (RuntimeException e) {
+      assertEquals("could not merge metadata: key schema_num has conflicting values: [2,
1]", e.getMessage());
+    }
+
+    try {
+      ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.commonMetaPath1, info.commonMetaPath2),
mergedCommonOut, info.conf);
+      fail("this should throw");
+    } catch (RuntimeException e) {
+      assertEquals("could not merge metadata: key schema_num has conflicting values: [2,
1]", e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index d22b657..597daa8 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -28,6 +28,7 @@ import org.apache.parquet.CorruptStatistics;
 import org.apache.parquet.Version;
 import org.apache.parquet.VersionParser;
 import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.junit.Assume;
 import org.junit.Rule;
 import org.junit.Test;
@@ -550,7 +551,7 @@ public class TestParquetFileWriter {
     FileStatus outputStatus = fs.getFileStatus(testDirPath);
     List<Footer> footers = ParquetFileReader.readFooters(configuration, outputStatus,
false);
     validateFooters(footers);
-    ParquetFileWriter.writeMetadataFile(configuration, testDirPath, footers);
+    ParquetFileWriter.writeMetadataFile(configuration, testDirPath, footers, JobSummaryLevel.ALL);
 
     footers = ParquetFileReader.readFooters(configuration, outputStatus, false);
     validateFooters(footers);
@@ -759,7 +760,7 @@ public class TestParquetFileWriter {
     footers.add(footer);
 
     // This should not throw an exception
-    ParquetFileWriter.writeMetadataFile(conf, relativeRoot, footers);
+    ParquetFileWriter.writeMetadataFile(conf, relativeRoot, footers, JobSummaryLevel.ALL);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetOutputFormatJobSummaryLevel.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetOutputFormatJobSummaryLevel.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetOutputFormatJobSummaryLevel.java
new file mode 100644
index 0000000..b81dd0c
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetOutputFormatJobSummaryLevel.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestParquetOutputFormatJobSummaryLevel {
+  @Test
+  public void testDefault() throws Exception {
+    Configuration conf = new Configuration();
+    // default should be ALL
+    assertEquals(JobSummaryLevel.ALL, ParquetOutputFormat.getJobSummaryLevel(conf));
+  }
+
+  @Test
+  public void testDeprecatedStillWorks() throws Exception {
+    Configuration conf = new Configuration();
+
+    conf.set(ParquetOutputFormat.ENABLE_JOB_SUMMARY, "true");
+    assertEquals(JobSummaryLevel.ALL, ParquetOutputFormat.getJobSummaryLevel(conf));
+
+    conf.set(ParquetOutputFormat.ENABLE_JOB_SUMMARY, "false");
+    assertEquals(JobSummaryLevel.NONE, ParquetOutputFormat.getJobSummaryLevel(conf));
+  }
+
+  @Test
+  public void testLevelParses() throws Exception {
+    Configuration conf = new Configuration();
+
+    conf.set(ParquetOutputFormat.JOB_SUMMARY_LEVEL, "all");
+    assertEquals(JobSummaryLevel.ALL, ParquetOutputFormat.getJobSummaryLevel(conf));
+
+    conf.set(ParquetOutputFormat.JOB_SUMMARY_LEVEL, "common_only");
+    assertEquals(JobSummaryLevel.COMMON_ONLY, ParquetOutputFormat.getJobSummaryLevel(conf));
+
+    conf.set(ParquetOutputFormat.JOB_SUMMARY_LEVEL, "none");
+    assertEquals(JobSummaryLevel.NONE, ParquetOutputFormat.getJobSummaryLevel(conf));
+  }
+
+  @Test
+  public void testLevelTakesPrecedence() throws Exception {
+    Configuration conf = new Configuration();
+
+    conf.set(ParquetOutputFormat.JOB_SUMMARY_LEVEL, "common_only");
+    conf.set(ParquetOutputFormat.ENABLE_JOB_SUMMARY, "false");
+    assertEquals(JobSummaryLevel.COMMON_ONLY, ParquetOutputFormat.getJobSummaryLevel(conf));
+  }
+
+}


Mime
View raw message