parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject parquet-mr git commit: PARQUET-460: merge multi parquet files to one file
Date Tue, 16 Aug 2016 17:40:56 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 898f3d0f6 -> 255f10834


PARQUET-460: merge multi parquet files to one file

A merge command for parquet-tools based on https://issues.apache.org/jira/browse/PARQUET-382.

Author: flykobe <flykobecy@gmail.com>

Closes #327 from flykobe/merge_tool and squashes the following commits:

b031c18 [flykobe] check input files
da28832 [flykobe] merge multi parquet files to one file


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/255f1083
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/255f1083
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/255f1083

Branch: refs/heads/master
Commit: 255f10834a67cf13518316de0e2c8a345677ebbf
Parents: 898f3d0
Author: flykobe <flykobecy@gmail.com>
Authored: Tue Aug 16 10:40:52 2016 -0700
Committer: Julien Le Dem <julien@dremio.com>
Committed: Tue Aug 16 10:40:52 2016 -0700

----------------------------------------------------------------------
 .../parquet/tools/command/MergeCommand.java     | 157 +++++++++++++++++++
 .../apache/parquet/tools/command/Registry.java  |   1 +
 parquet-tools/src/main/scripts/parquet-merge    |  28 ++++
 3 files changed, 186 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/255f1083/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
----------------------------------------------------------------------
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
new file mode 100644
index 0000000..e6d9747
--- /dev/null
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.slf4j.Logger;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MergeCommand extends ArgsOnlyCommand {
+  public static final String[] USAGE = new String[] {
+          "<input> [<input> ...] <output>",
+          "where <input> is the source parquet files/directory to be merged",
+          "   <output> is the destination parquet file"
+  };
+
+  /**
+   * Biggest number of input files we can merge.
+   */
+  private static final int MAX_FILE_NUM = 100;
+
+  private Configuration conf;
+
+  public MergeCommand() {
+    super(2, MAX_FILE_NUM + 1);
+
+    conf = new Configuration();
+  }
+
+  @Override
+  public String[] getUsageDescription() {
+    return USAGE;
+  }
+
+  @Override
+  public void execute(CommandLine options) throws Exception {
+    // Prepare arguments
+    List<String> args = options.getArgList();
+    List<Path> inputFiles = getInputFiles(args.subList(0, args.size() - 1));
+    Path outputFile = new Path(args.get(args.size() - 1));
+
+    // Merge schema and extraMeta
+    FileMetaData mergedMeta = mergedMetadata(inputFiles);
+
+    // Merge data
+    ParquetFileWriter writer = new ParquetFileWriter(conf,
+            mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE);
+    writer.start();
+    for (Path input: inputFiles) {
+      writer.appendFile(conf, input);
+    }
+    writer.end(mergedMeta.getKeyValueMetaData());
+  }
+
+  private FileMetaData mergedMetadata(List<Path> inputFiles) throws IOException {
+    return ParquetFileWriter.mergeMetadataFiles(inputFiles, conf).getFileMetaData();
+  }
+
+  /**
+   * Get all input files.
+   * @param input input files or directory.
+   * @return ordered input files.
+   */
+  private List<Path> getInputFiles(List<String> input) throws IOException {
+    List<Path> inputFiles = null;
+
+    if (input.size() == 1) {
+      Path p = new Path(input.get(0));
+      FileSystem fs = p.getFileSystem(conf);
+      FileStatus status = fs.getFileStatus(p);
+
+      if (status.isDir()) {
+        inputFiles = getInputFilesFromDirectory(status);
+      }
+    } else {
+      inputFiles = parseInputFiles(input);
+    }
+
+    checkParquetFiles(inputFiles);
+
+    return inputFiles;
+  }
+
+  /**
+   * Check input files basically.
+   * ParquetFileReader will throw exception when reading an illegal parquet file.
+   *
+   * @param inputFiles files to be merged.
+   * @throws IOException
+   */
+  private void checkParquetFiles(List<Path> inputFiles) throws IOException {
+    if (inputFiles == null || inputFiles.size() <= 1) {
+      throw new IllegalArgumentException("Not enough files to merge");
+    }
+
+    for (Path inputFile: inputFiles) {
+      FileSystem fs = inputFile.getFileSystem(conf);
+      FileStatus status = fs.getFileStatus(inputFile);
+
+      if (status.isDir()) {
+        throw new IllegalArgumentException("Illegal parquet file: " + inputFile.toUri());
+      }
+    }
+  }
+
+  /**
+   * Get all parquet files under partition directory.
+   * @param partitionDir partition directory.
+   * @return parquet files to be merged.
+   */
+  private List<Path> getInputFilesFromDirectory(FileStatus partitionDir) throws IOException
{
+    FileSystem fs = partitionDir.getPath().getFileSystem(conf);
+    FileStatus[] inputFiles = fs.listStatus(partitionDir.getPath(), HiddenFileFilter.INSTANCE);
+
+    List<Path> input = new ArrayList<Path>();
+    for (FileStatus f: inputFiles) {
+      input.add(f.getPath());
+    }
+    return input;
+  }
+
+  private List<Path> parseInputFiles(List<String> input) {
+    List<Path> inputFiles = new ArrayList<Path>();
+
+    for (String name: input) {
+      inputFiles.add(new Path(name));
+    }
+
+    return inputFiles;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/255f1083/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
----------------------------------------------------------------------
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
index d9c59cc..a722408 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
@@ -31,6 +31,7 @@ public final class Registry {
     registry.put("schema", ShowSchemaCommand.class);
     registry.put("meta", ShowMetaCommand.class);
     registry.put("dump", DumpCommand.class);
+    registry.put("merge", MergeCommand.class);
   }
 
   public static Map<String,Command> allCommands() {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/255f1083/parquet-tools/src/main/scripts/parquet-merge
----------------------------------------------------------------------
diff --git a/parquet-tools/src/main/scripts/parquet-merge b/parquet-tools/src/main/scripts/parquet-merge
new file mode 100755
index 0000000..995a105
--- /dev/null
+++ b/parquet-tools/src/main/scripts/parquet-merge
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# The name of the top-level script
+TOPSCRIPT="parquet-tools"
+
+# Determine the path to the script's directory
+APPPATH=$( cd "$(dirname "$0")" ; pwd -P )
+
+# Run the application
+exec "${APPPATH}/${TOPSCRIPT}" merge "$@"


Mime
View raw message