parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ziva...@apache.org
Subject [parquet-mr] branch master updated: PARQUET-1381: Add merge blocks command to parquet-tools (#512)
Date Tue, 11 Sep 2018 11:56:59 GMT
This is an automated email from the ASF dual-hosted git repository.

zivanfi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 863a081  PARQUET-1381: Add merge blocks command to parquet-tools (#512)
863a081 is described below

commit 863a081850e56bbbb38d7b68b478a3bd40779723
Author: Ekaterina Galieva <katerina.galieva@gmail.com>
AuthorDate: Tue Sep 11 04:56:57 2018 -0700

    PARQUET-1381: Add merge blocks command to parquet-tools (#512)
    
    Existing implementation of merge command in parquet-tools didn't merge row groups, just
placed one after the other. This commit adds API and command option to be able to merge small
blocks into larger ones up to specified size limit.
---
 .../parquet/column/impl/ColumnReadStoreImpl.java   |   2 +-
 .../parquet/hadoop/ColumnChunkPageWriteStore.java  |   5 +
 .../apache/parquet/hadoop/ParquetFileReader.java   |  93 +++++--
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 115 +++++++++
 .../apache/parquet/hadoop/util/BlocksCombiner.java | 106 ++++++++
 .../hadoop/TestParquetWriterMergeBlocks.java       | 280 +++++++++++++++++++++
 .../apache/parquet/tools/command/MergeCommand.java |  75 +++++-
 7 files changed, 652 insertions(+), 24 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
index 3784596..e582908 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@ -75,7 +75,7 @@ public class ColumnReadStoreImpl implements ColumnReadStore {
     return newMemColumnReader(path, pageReadStore.getPageReader(path));
   }
 
-  private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader)
{
+  public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader)
{
     PrimitiveConverter converter = getPrimitiveConverter(path);
     return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
   }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 82c288f..5349dc2 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -262,4 +262,9 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
     }
   }
 
+  void flushToFileWriter(ColumnDescriptor path, ParquetFileWriter writer) throws IOException
{
+    ColumnChunkPageWriter pageWriter = writers.get(path);
+    pageWriter.writeToFileWriter(writer);
+  }
+
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 15fe592..527c831 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -40,6 +40,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -56,6 +57,7 @@ import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -1160,27 +1162,8 @@ public class ParquetFileReader implements Closeable {
      * @throws IOException if there is an error while reading from the stream
      */
     public List<Chunk> readAll(SeekableInputStream f) throws IOException {
-      List<Chunk> result = new ArrayList<Chunk>(chunks.size());
-      f.seek(offset);
-
-      int fullAllocations = length / options.getMaxAllocationSize();
-      int lastAllocationSize = length % options.getMaxAllocationSize();
-
-      int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
-      List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
-
-      for (int i = 0; i < fullAllocations; i += 1) {
-        buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
-      }
-
-      if (lastAllocationSize > 0) {
-        buffers.add(options.getAllocator().allocate(lastAllocationSize));
-      }
-
-      for (ByteBuffer buffer : buffers) {
-        f.readFully(buffer);
-        buffer.flip();
-      }
+      List<Chunk> result = new ArrayList<>(chunks.size());
+      List<ByteBuffer> buffers = readBlocks(f, offset, length);
 
       // report in a counter the data we just scanned
       BenchmarkCounter.incrementBytesRead(length);
@@ -1206,4 +1189,72 @@ public class ParquetFileReader implements Closeable {
 
   }
 
+  /**
+   * @param f file to read the blocks from
+   * @return the ByteBuffer blocks
+   * @throws IOException if there is an error while reading from the stream
+   */
+  List<ByteBuffer> readBlocks(SeekableInputStream f, long offset, int length) throws
IOException {
+    f.seek(offset);
+
+    int fullAllocations = length / options.getMaxAllocationSize();
+    int lastAllocationSize = length % options.getMaxAllocationSize();
+
+    int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
+    List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
+
+    for (int i = 0; i < fullAllocations; i++) {
+      buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
+    }
+
+    if (lastAllocationSize > 0) {
+      buffers.add(options.getAllocator().allocate(lastAllocationSize));
+    }
+
+    for (ByteBuffer buffer : buffers) {
+      f.readFully(buffer);
+      buffer.flip();
+    }
+    return buffers;
+  }
+
+  Optional<PageReader> readColumnInBlock(int blockIndex, ColumnDescriptor columnDescriptor)
{
+    BlockMetaData block = blocks.get(blockIndex);
+    if (block.getRowCount() == 0) {
+      throw new RuntimeException("Illegal row group of 0 rows");
+    }
+    Optional<ColumnChunkMetaData> mc = findColumnByPath(block, columnDescriptor.getPath());
+
+    return mc.map(column -> new ChunkDescriptor(columnDescriptor, column, column.getStartingPos(),
(int) column.getTotalSize()))
+      .map(chunk -> readChunk(f, chunk));
+  }
+
+  private ColumnChunkPageReader readChunk(SeekableInputStream f, ChunkDescriptor descriptor)
{
+    try {
+      List<ByteBuffer> buffers = readBlocks(f, descriptor.fileOffset, descriptor.size);
+      ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
+      Chunk chunk = new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size),
f);
+      return chunk.readAllPages();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Optional<ColumnChunkMetaData> findColumnByPath(BlockMetaData block, String[]
path) {
+    for (ColumnChunkMetaData column : block.getColumns()) {
+      if (Arrays.equals(column.getPath().toArray(), path)) {
+        return Optional.of(column);
+      }
+    }
+    return Optional.empty();
+  }
+
+  public int blocksCount() {
+    return blocks.size();
+  }
+
+  public BlockMetaData getBlockMetaData(int blockIndex) {
+    return blocks.get(blockIndex);
+  }
+
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index c98c247..b944e97 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -25,12 +25,15 @@ import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -41,13 +44,22 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.Strings;
 import org.apache.parquet.Version;
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.example.DummyRecordConverter;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -57,6 +69,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.GlobalMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.BlocksCombiner;
 import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.hadoop.util.HadoopStreams;
 import org.apache.parquet.io.InputFile;
@@ -519,6 +532,108 @@ public class ParquetFileWriter {
     ParquetFileReader.open(file).appendTo(this);
   }
 
+  public int merge(List<InputFile> inputFiles, CodecFactory.BytesCompressor compressor,
String createdBy, long maxBlockSize) throws IOException {
+    List<ParquetFileReader> readers = getReaders(inputFiles);
+    try {
+      ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+      ColumnReadStoreImpl columnReadStore = new ColumnReadStoreImpl(null, new DummyRecordConverter(schema).getRootConverter(),
schema, createdBy);
+      this.start();
+      List<BlocksCombiner.SmallBlocksUnion> largeBlocks = BlocksCombiner.combineLargeBlocks(readers,
maxBlockSize);
+      for (BlocksCombiner.SmallBlocksUnion smallBlocks : largeBlocks) {
+        for (int columnIndex = 0; columnIndex < schema.getColumns().size(); columnIndex++)
{
+          ColumnDescriptor path = schema.getColumns().get(columnIndex);
+          ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor, schema,
allocator);
+          ColumnWriteStoreV1 columnWriteStoreV1 = new ColumnWriteStoreV1(store, ParquetProperties.builder().build());
+          for (BlocksCombiner.SmallBlock smallBlock : smallBlocks.getBlocks()) {
+            ParquetFileReader parquetFileReader = smallBlock.getReader();
+            try {
+              Optional<PageReader> columnChunkPageReader = parquetFileReader.readColumnInBlock(smallBlock.getBlockIndex(),
path);
+              ColumnWriter columnWriter = columnWriteStoreV1.getColumnWriter(path);
+              if (columnChunkPageReader.isPresent()) {
+                ColumnReader columnReader = columnReadStore.newMemColumnReader(path, columnChunkPageReader.get());
+                for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
+                  consumeTriplet(columnWriter, columnReader);
+                }
+              } else {
+                MessageType inputFileSchema = parquetFileReader.getFileMetaData().getSchema();
+                String[] parentPath = getExisingParentPath(path, inputFileSchema);
+                int def = parquetFileReader.getFileMetaData().getSchema().getMaxDefinitionLevel(parentPath);
+                int rep = parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath);
+                for (int i = 0; i < parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount();
i++) {
+                  columnWriter.writeNull(rep, def);
+                }
+              }
+            } catch (Exception e) {
+              LOG.error("File {} is not readable", parquetFileReader.getFile(), e);
+            }
+          }
+          if (columnIndex == 0) {
+            this.startBlock(smallBlocks.getRowCount());
+          }
+          columnWriteStoreV1.flush();
+          store.flushToFileWriter(path, this);
+        }
+        this.endBlock();
+      }
+      this.end(Collections.emptyMap());
+    }finally {
+      BlocksCombiner.closeReaders(readers);
+    }
+    return 0;
+  }
+
+  private String[] getExisingParentPath(ColumnDescriptor path, MessageType inputFileSchema)
{
+    List<String> parentPath = Arrays.asList(path.getPath());
+    while (parentPath.size() > 0 && !inputFileSchema.containsPath(parentPath.toArray(new
String[parentPath.size()]))) {
+      parentPath = parentPath.subList(0, parentPath.size() - 1);
+    }
+    return parentPath.toArray(new String[parentPath.size()]);
+  }
+
+  private List<ParquetFileReader> getReaders(List<InputFile> inputFiles) throws
IOException {
+    List<ParquetFileReader> readers = new ArrayList<>(inputFiles.size());
+    for (InputFile inputFile : inputFiles) {
+      readers.add(ParquetFileReader.open(inputFile));
+    }
+    return readers;
+  }
+
+  private void consumeTriplet(ColumnWriter columnWriter, ColumnReader columnReader) {
+    int definitionLevel = columnReader.getCurrentDefinitionLevel();
+    int repetitionLevel = columnReader.getCurrentRepetitionLevel();
+    ColumnDescriptor column = columnReader.getDescriptor();
+    PrimitiveType type = column.getPrimitiveType();
+    if (definitionLevel < column.getMaxDefinitionLevel()) {
+      columnWriter.writeNull(repetitionLevel, definitionLevel);
+    } else {
+      switch (type.getPrimitiveTypeName()) {
+        case INT32:
+          columnWriter.write(columnReader.getInteger(), repetitionLevel, definitionLevel);
+          break;
+        case INT64:
+          columnWriter.write(columnReader.getLong(), repetitionLevel, definitionLevel);
+          break;
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+        case INT96:
+          columnWriter.write(columnReader.getBinary(), repetitionLevel, definitionLevel);
+          break;
+        case BOOLEAN:
+          columnWriter.write(columnReader.getBoolean(), repetitionLevel, definitionLevel);
+          break;
+        case FLOAT:
+          columnWriter.write(columnReader.getFloat(), repetitionLevel, definitionLevel);
+          break;
+        case DOUBLE:
+          columnWriter.write(columnReader.getDouble(), repetitionLevel, definitionLevel);
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown primitive type " + type);
+      }
+    }
+    columnReader.consume();
+  }
+
   /**
    * @param file a file stream to read from
    * @param rowGroups row groups to copy
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java
new file mode 100644
index 0000000..02dadc7
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Collections.unmodifiableList;
+
+public class BlocksCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BlocksCombiner.class);
+
+  public static List<SmallBlocksUnion> combineLargeBlocks(List<ParquetFileReader>
readers, long maxBlockSize) {
+    List<SmallBlocksUnion> blocks = new ArrayList<>();
+    long largeBlockSize = 0;
+    long largeBlockRecords = 0;
+    List<SmallBlock> smallBlocks = new ArrayList<>();
+    for (ParquetFileReader reader : readers) {
+      for (int blockIndex = 0; blockIndex < reader.blocksCount(); blockIndex++) {
+        BlockMetaData block = reader.getBlockMetaData(blockIndex);
+        if (!smallBlocks.isEmpty() && largeBlockSize + block.getTotalByteSize() >
maxBlockSize) {
+          blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
+          smallBlocks = new ArrayList<>();
+          largeBlockSize = 0;
+          largeBlockRecords = 0;
+        }
+        largeBlockSize += block.getTotalByteSize();
+        largeBlockRecords += block.getRowCount();
+        smallBlocks.add(new SmallBlock(reader, blockIndex));
+      }
+    }
+    if (!smallBlocks.isEmpty()) {
+      blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
+    }
+    return unmodifiableList(blocks);
+  }
+
+  public static void closeReaders(List<ParquetFileReader> readers) {
+    readers.forEach(r -> {
+      try {
+        r.close();
+      } catch (IOException e) {
+        LOG.error("Error closing reader {}", r.getFile(), e);
+      }
+    });
+  }
+
+  public static class SmallBlocksUnion {
+    private final List<SmallBlock> blocks;
+    private final long rowCount;
+
+    public SmallBlocksUnion(List<SmallBlock> blocks, long rowCount) {
+      this.blocks = blocks;
+      this.rowCount = rowCount;
+    }
+
+    public List<SmallBlock> getBlocks() {
+      return blocks;
+    }
+
+    public long getRowCount() {
+      return rowCount;
+    }
+  }
+
+  public static class SmallBlock {
+    private final ParquetFileReader reader;
+    private final int blockIndex;
+
+    public SmallBlock(ParquetFileReader reader, int blockIndex) {
+      this.reader = reader;
+      this.blockIndex = blockIndex;
+    }
+
+    public ParquetFileReader getReader() {
+      return reader;
+    }
+
+    public int getBlockIndex() {
+      return blockIndex;
+    }
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java
new file mode 100644
index 0000000..a972238
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.asList;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+
+public class TestParquetWriterMergeBlocks {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static final int FILE_SIZE = 10000;
+  public static final Configuration CONF = new Configuration();
+  public static final Map<String, String> EMPTY_METADATA =
+    new HashMap<String, String>();
+  public static final MessageType FILE_SCHEMA = Types.buildMessage()
+    .required(INT32).named("id")
+    .required(BINARY).as(UTF8).named("string")
+    .named("AppendTest");
+  public static final SimpleGroupFactory GROUP_FACTORY =
+    new SimpleGroupFactory(FILE_SCHEMA);
+
+  public Path file1;
+  public List<Group> file1content = new ArrayList<Group>();
+  public Path file2;
+  public List<Group> file2content = new ArrayList<Group>();
+
+  @Before
+  public void createSourceData() throws IOException {
+    this.file1 = newTemp();
+    this.file2 = newTemp();
+
+    ParquetWriter<Group> writer1 = ExampleParquetWriter.builder(file1)
+      .withType(FILE_SCHEMA)
+      .build();
+    ParquetWriter<Group> writer2 = ExampleParquetWriter.builder(file2)
+      .withType(FILE_SCHEMA)
+      .build();
+
+    for (int i = 0; i < FILE_SIZE; i += 1) {
+      Group group1 = GROUP_FACTORY.newGroup();
+      group1.add("id", i);
+      group1.add("string", UUID.randomUUID().toString());
+      writer1.write(group1);
+      file1content.add(group1);
+
+      Group group2 = GROUP_FACTORY.newGroup();
+      group2.add("id", FILE_SIZE+i);
+      group2.add("string", UUID.randomUUID().toString());
+      writer2.write(group2);
+      file2content.add(group2);
+    }
+
+    writer1.close();
+    writer2.close();
+  }
+
+  @Test
+  public void testBasicBehavior() throws IOException {
+    Path combinedFile = newTemp();
+    ParquetFileWriter writer = new ParquetFileWriter(
+      CONF, FILE_SCHEMA, combinedFile);
+
+    // Merge schema and extraMeta
+    List<Path> inputFiles = asList(file1, file2);
+    FileMetaData mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles, CONF).getFileMetaData();
+    List<InputFile> inputFileList = toInputFiles(inputFiles);
+    CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(CompressionCodecName.SNAPPY);
+
+    writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), 128 * 1024 * 1024);
+
+    LinkedList<Group> expected = new LinkedList<>();
+    expected.addAll(file1content);
+    expected.addAll(file2content);
+
+    ParquetReader<Group> reader = ParquetReader
+      .builder(new GroupReadSupport(), combinedFile)
+      .build();
+
+    Group next;
+    while ((next = reader.read()) != null) {
+      Group expectedNext = expected.removeFirst();
+      // check each value; equals is not supported for simple records
+      Assert.assertEquals("Each id should match",
+        expectedNext.getInteger("id", 0), next.getInteger("id", 0));
+      Assert.assertEquals("Each string should match",
+        expectedNext.getString("string", 0), next.getString("string", 0));
+    }
+
+    Assert.assertEquals("All records should be present", 0, expected.size());
+  }
+
+  private List<InputFile> toInputFiles(List<Path> inputFiles) {
+    return inputFiles.stream()
+      .map(input -> {
+        try {
+          return HadoopInputFile.fromPath(input, CONF);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }).collect(Collectors.toList());
+  }
+
+  @Test
+  public void testMergedMetadata() throws IOException {
+    Path combinedFile = newTemp();
+    ParquetFileWriter writer = new ParquetFileWriter(
+      CONF, FILE_SCHEMA, combinedFile);
+
+    // Merge schema and extraMeta
+    List<Path> inputFiles = asList(file1, file2);
+    FileMetaData mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles, CONF).getFileMetaData();
+    List<InputFile> inputFileList = toInputFiles(inputFiles);
+    CompressionCodecName codecName = CompressionCodecName.GZIP;
+    CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(codecName);
+    writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), 128 * 1024 * 1024);
+
+    ParquetMetadata combinedFooter = ParquetFileReader.readFooter(
+      CONF, combinedFile, NO_FILTER);
+    ParquetMetadata f1Footer = ParquetFileReader.readFooter(
+      CONF, file1, NO_FILTER);
+    ParquetMetadata f2Footer = ParquetFileReader.readFooter(
+      CONF, file2, NO_FILTER);
+
+    LinkedList<BlockMetaData> expectedRowGroups = new LinkedList<>();
+    expectedRowGroups.addAll(f1Footer.getBlocks());
+    expectedRowGroups.addAll(f2Footer.getBlocks());
+    long totalRowCount = expectedRowGroups.stream().mapToLong(BlockMetaData::getRowCount).sum();
+    Assert.assertEquals("Combined should have a single row group",
+      1,
+      combinedFooter.getBlocks().size());
+
+    BlockMetaData rowGroup = combinedFooter.getBlocks().get(0);
+    Assert.assertEquals("Row count should match",
+      totalRowCount, rowGroup.getRowCount());
+    assertColumnsEquivalent(f1Footer.getBlocks().get(0).getColumns(), rowGroup.getColumns(),
codecName);
+  }
+
+  public void assertColumnsEquivalent(List<ColumnChunkMetaData> expected,
+                                      List<ColumnChunkMetaData> actual,
+                                      CompressionCodecName codecName) {
+    Assert.assertEquals("Should have the expected columns",
+      expected.size(), actual.size());
+    for (int i = 0; i < actual.size(); i += 1) {
+      long numNulls = 0;
+      long valueCount = 0;
+      ColumnChunkMetaData current = actual.get(i);
+      Statistics statistics = current.getStatistics();
+      numNulls += statistics.getNumNulls();
+      valueCount += current.getValueCount();
+      if (i != 0) {
+        ColumnChunkMetaData previous = actual.get(i - 1);
+        long expectedStart = previous.getStartingPos() + previous.getTotalSize();
+        Assert.assertEquals("Should start after the previous column",
+          expectedStart, current.getStartingPos());
+      }
+
+      assertColumnMetadataEquivalent(expected.get(i), current, codecName, numNulls, valueCount);
+    }
+  }
+
+  public void assertColumnMetadataEquivalent(ColumnChunkMetaData expected,
+                                             ColumnChunkMetaData actual,
+                                             CompressionCodecName codecName,
+                                             long numNulls,
+                                             long valueCount) {
+    Assert.assertEquals("Should be the expected column",
+      expected.getPath(), expected.getPath());
+    Assert.assertEquals("Primitive type should not change",
+      expected.getType(), actual.getType());
+    Assert.assertEquals("Compression codec should not change",
+      codecName, actual.getCodec());
+    Assert.assertEquals("Data encodings should not change",
+      expected.getEncodings(), actual.getEncodings());
+    Assert.assertEquals("Statistics should not change",
+      numNulls, actual.getStatistics().getNumNulls());
+    Assert.assertEquals("Number of values should not change",
+      valueCount, actual.getValueCount());
+
+  }
+
+  @Test
+  public void testAllowDroppingColumns() throws IOException {
+    MessageType droppedColumnSchema = Types.buildMessage()
+      .required(BINARY).as(UTF8).named("string")
+      .named("AppendTest");
+
+    Path droppedColumnFile = newTemp();
+    List<Path> inputFiles = asList(file1, file2);
+    ParquetFileWriter writer = new ParquetFileWriter(
+      CONF, droppedColumnSchema, droppedColumnFile);
+    List<InputFile> inputFileList = toInputFiles(inputFiles);
+    CompressionCodecName codecName = CompressionCodecName.GZIP;
+    CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(codecName);
+    writer.merge(inputFileList, compressor, "", 128*1024*1024);
+
+    LinkedList<Group> expected = new LinkedList<Group>();
+    expected.addAll(file1content);
+    expected.addAll(file2content);
+
+    ParquetMetadata footer = ParquetFileReader.readFooter(
+      CONF, droppedColumnFile, NO_FILTER);
+    for (BlockMetaData rowGroup : footer.getBlocks()) {
+      Assert.assertEquals("Should have only the string column",
+        1, rowGroup.getColumns().size());
+    }
+
+    ParquetReader<Group> reader = ParquetReader
+      .builder(new GroupReadSupport(), droppedColumnFile)
+      .build();
+
+    Group next;
+    while ((next = reader.read()) != null) {
+      Group expectedNext = expected.removeFirst();
+      Assert.assertEquals("Each string should match",
+        expectedNext.getString("string", 0), next.getString("string", 0));
+    }
+
+    Assert.assertEquals("All records should be present", 0, expected.size());
+  }
+
+  private Path newTemp() throws IOException {
+    File file = temp.newFile();
+    Preconditions.checkArgument(file.delete(), "Could not remove temp file");
+    return new Path(file.toString());
+  }
+}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
index fe64587..6d5b313 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
@@ -19,20 +19,29 @@
 package org.apache.parquet.tools.command;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.InputFile;
 import org.apache.parquet.tools.Main;
 
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
 
 public class MergeCommand extends ArgsOnlyCommand {
   public static final String[] USAGE = new String[] {
@@ -49,6 +58,32 @@ public class MergeCommand extends ArgsOnlyCommand {
 
   private Configuration conf;
 
+  private static final Options OPTIONS;
+  static {
+    OPTIONS = new Options();
+
+    Option block = Option.builder("b")
+      .longOpt("block")
+      .desc("Merge adjacent blocks into one up to upper bound size limit default to 128 MB")
+      .build();
+
+    Option limit = Option.builder("l")
+      .longOpt("limit")
+      .desc("Upper bound for merged block size in megabytes. Default: 128 MB")
+      .hasArg()
+      .build();
+
+    Option codec = Option.builder("c")
+      .longOpt("codec")
+      .desc("Compression codec name. Default: SNAPPY. Valid values: UNCOMPRESSED, SNAPPY,
GZIP, LZO, BROTLI, LZ4, ZSTD")
+      .hasArg()
+      .build();
+
+    OPTIONS.addOption(limit);
+    OPTIONS.addOption(block);
+    OPTIONS.addOption(codec);
+  }
+
   public MergeCommand() {
     super(2, MAX_FILE_NUM + 1);
 
@@ -56,6 +91,11 @@ public class MergeCommand extends ArgsOnlyCommand {
   }
 
   @Override
+  public Options getOptions() {
+    return OPTIONS;
+  }
+
+  @Override
   public String[] getUsageDescription() {
     return USAGE;
   }
@@ -63,18 +103,32 @@ public class MergeCommand extends ArgsOnlyCommand {
   @Override
   public String getCommandDescription() {
     return "Merges multiple Parquet files into one. " +
-      "The command doesn't merge row groups, just places one after the other. " +
+      "Without -b option the command doesn't merge row groups, just places one after the
other. " +
       "When used to merge many small files, the resulting file will still contain small row
groups, " +
-      "which usually leads to bad query performance.";
+      "which usually leads to bad query performance. " +
+      "To have adjacent small blocks merged together use -b option. " +
+      "Blocks will be grouped into larger one until the upper bound is reached. " +
+      "Default block upper bound 128 MB and default compression SNAPPY can be customized
using -l and -c options";
   }
 
   @Override
   public void execute(CommandLine options) throws Exception {
+    boolean mergeBlocks = options.hasOption('b');
+    int maxBlockSize = options.hasOption('l')? Integer.parseInt(options.getOptionValue('l'))
* 1024 * 1024 : DEFAULT_BLOCK_SIZE;
+    CompressionCodecName compressionCodec = options.hasOption('c') ? CompressionCodecName.valueOf(options.getOptionValue('c'))
: CompressionCodecName.SNAPPY;
     // Prepare arguments
     List<String> args = options.getArgList();
     List<Path> inputFiles = getInputFiles(args.subList(0, args.size() - 1));
     Path outputFile = new Path(args.get(args.size() - 1));
+    if (mergeBlocks) {
+      CodecFactory.BytesCompressor compressor = new CodecFactory(conf, DEFAULT_PAGE_SIZE).getCompressor(compressionCodec);
+      mergeBlocks(maxBlockSize, compressor, inputFiles, outputFile);
+    } else {
+      mergeFiles(inputFiles, outputFile);
+    }
+  }
 
+  private void mergeFiles(List<Path> inputFiles, Path outputFile) throws IOException
{
     // Merge schema and extraMeta
     FileMetaData mergedMeta = mergedMetadata(inputFiles);
     PrintWriter out = new PrintWriter(Main.out, true);
@@ -103,6 +157,23 @@ public class MergeCommand extends ArgsOnlyCommand {
     writer.end(mergedMeta.getKeyValueMetaData());
   }
 
+  private void mergeBlocks(int maxBlockSize, CodecFactory.BytesCompressor compressor, List<Path>
inputFiles, Path outputFile) throws IOException {
+    // Merge schema and extraMeta
+    FileMetaData mergedMeta = mergedMetadata(inputFiles);
+
+    // Merge data
+    ParquetFileWriter writer = new ParquetFileWriter(conf, mergedMeta.getSchema(), outputFile,
ParquetFileWriter.Mode.CREATE);
+    List<InputFile> inputFileList = inputFiles.stream()
+      .map(input -> {
+        try {
+          return HadoopInputFile.fromPath(input, conf);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }).collect(Collectors.toList());
+    writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), maxBlockSize);
+  }
+
   private FileMetaData mergedMetadata(List<Path> inputFiles) throws IOException {
     return ParquetFileWriter.mergeMetadataFiles(inputFiles, conf).getFileMetaData();
   }


Mime
View raw message