parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject [parquet-mr] branch master updated: Revert "PARQUET-1381: Add merge blocks command to parquet-tools (#512)" (#621)
Date Mon, 25 Feb 2019 12:42:51 GMT
This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new ab42fe5  Revert "PARQUET-1381: Add merge blocks command to parquet-tools (#512)"
(#621)
ab42fe5 is described below

commit ab42fe5180366120336fb3f8b9e6540aadb5da1b
Author: Gabor Szadovszky <gabor@apache.org>
AuthorDate: Mon Feb 25 13:42:46 2019 +0100

    Revert "PARQUET-1381: Add merge blocks command to parquet-tools (#512)" (#621)
    
    This reverts commit 863a081850e56bbbb38d7b68b478a3bd40779723.
    
    The design of this feature has conceptional problems and also works incorrectly. See PARQUET-1381
for more details.
---
 .../parquet/column/impl/ColumnReadStoreImpl.java   |   2 +-
 .../parquet/hadoop/ColumnChunkPageWriteStore.java  |   5 -
 .../apache/parquet/hadoop/ParquetFileReader.java   |  92 ++-----
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 123 ---------
 .../apache/parquet/hadoop/util/BlocksCombiner.java | 106 --------
 .../hadoop/TestParquetWriterMergeBlocks.java       | 280 ---------------------
 .../apache/parquet/tools/command/MergeCommand.java |  75 +-----
 7 files changed, 24 insertions(+), 659 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
index b7e1597..755985d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@ -85,7 +85,7 @@ public class ColumnReadStoreImpl implements ColumnReadStore {
     }
   }
 
-  public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader)
{
+  private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader)
{
     PrimitiveConverter converter = getPrimitiveConverter(path);
     return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
   }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index f87630b..f85d374 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -292,9 +292,4 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
     }
   }
 
-  void flushToFileWriter(ColumnDescriptor path, ParquetFileWriter writer) throws IOException
{
-    ColumnChunkPageWriter pageWriter = writers.get(path);
-    pageWriter.writeToFileWriter(writer);
-  }
-
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 01867c6..8e205f6 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -42,7 +42,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -67,7 +66,6 @@ import org.apache.parquet.column.page.DataPageV1;
 import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
-import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 import org.apache.parquet.filter2.compat.FilterCompat;
@@ -1408,7 +1406,27 @@ public class ParquetFileReader implements Closeable {
      * @throws IOException if there is an error while reading from the stream
      */
     public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException
{
-      List<ByteBuffer> buffers = readBlocks(f, offset, length);
+      List<Chunk> result = new ArrayList<Chunk>(chunks.size());
+      f.seek(offset);
+
+      int fullAllocations = length / options.getMaxAllocationSize();
+      int lastAllocationSize = length % options.getMaxAllocationSize();
+
+      int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
+      List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
+
+      for (int i = 0; i < fullAllocations; i += 1) {
+        buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
+      }
+
+      if (lastAllocationSize > 0) {
+        buffers.add(options.getAllocator().allocate(lastAllocationSize));
+      }
+
+      for (ByteBuffer buffer : buffers) {
+        f.readFully(buffer);
+        buffer.flip();
+      }
 
       // report in a counter the data we just scanned
       BenchmarkCounter.incrementBytesRead(length);
@@ -1428,72 +1446,4 @@ public class ParquetFileReader implements Closeable {
 
   }
 
-  /**
-   * @param f file to read the blocks from
-   * @return the ByteBuffer blocks
-   * @throws IOException if there is an error while reading from the stream
-   */
-  List<ByteBuffer> readBlocks(SeekableInputStream f, long offset, int length) throws
IOException {
-    f.seek(offset);
-
-    int fullAllocations = length / options.getMaxAllocationSize();
-    int lastAllocationSize = length % options.getMaxAllocationSize();
-
-    int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
-    List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
-
-    for (int i = 0; i < fullAllocations; i++) {
-      buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
-    }
-
-    if (lastAllocationSize > 0) {
-      buffers.add(options.getAllocator().allocate(lastAllocationSize));
-    }
-
-    for (ByteBuffer buffer : buffers) {
-      f.readFully(buffer);
-      buffer.flip();
-    }
-    return buffers;
-  }
-
-  Optional<PageReader> readColumnInBlock(int blockIndex, ColumnDescriptor columnDescriptor)
{
-    BlockMetaData block = blocks.get(blockIndex);
-    if (block.getRowCount() == 0) {
-      throw new RuntimeException("Illegal row group of 0 rows");
-    }
-    Optional<ColumnChunkMetaData> mc = findColumnByPath(block, columnDescriptor.getPath());
-
-    return mc.map(column -> new ChunkDescriptor(columnDescriptor, column, column.getStartingPos(),
(int) column.getTotalSize()))
-      .map(chunk -> readChunk(f, chunk));
-  }
-
-  private ColumnChunkPageReader readChunk(SeekableInputStream f, ChunkDescriptor descriptor)
{
-    try {
-      List<ByteBuffer> buffers = readBlocks(f, descriptor.fileOffset, descriptor.size);
-      ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
-      Chunk chunk = new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size),
f, null);
-      return chunk.readAllPages();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private Optional<ColumnChunkMetaData> findColumnByPath(BlockMetaData block, String[]
path) {
-    for (ColumnChunkMetaData column : block.getColumns()) {
-      if (Arrays.equals(column.getPath().toArray(), path)) {
-        return Optional.of(column);
-      }
-    }
-    return Optional.empty();
-  }
-
-  public int blocksCount() {
-    return blocks.size();
-  }
-
-  public BlockMetaData getBlockMetaData(int blockIndex) {
-    return blocks.get(blockIndex);
-  }
-
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 14e3729..c875702 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -26,15 +26,12 @@ import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -45,23 +42,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.Strings;
 import org.apache.parquet.Version;
-import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnReader;
-import org.apache.parquet.column.ColumnWriteStore;
-import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.impl.ColumnReadStoreImpl;
-import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.example.DummyRecordConverter;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.Util;
@@ -72,7 +60,6 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.GlobalMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.hadoop.util.BlocksCombiner;
 import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.hadoop.util.HadoopStreams;
 import org.apache.parquet.internal.column.columnindex.ColumnIndex;
@@ -669,116 +656,6 @@ public class ParquetFileWriter {
     }
   }
 
-  public int merge(List<InputFile> inputFiles, CodecFactory.BytesCompressor compressor,
String createdBy, long maxBlockSize) throws IOException {
-    List<ParquetFileReader> readers = getReaders(inputFiles);
-    try {
-      ByteBufferAllocator allocator = new HeapByteBufferAllocator();
-      ColumnReadStoreImpl columnReadStore = new ColumnReadStoreImpl(null, new DummyRecordConverter(schema).getRootConverter(),
schema, createdBy);
-      this.start();
-      List<BlocksCombiner.SmallBlocksUnion> largeBlocks = BlocksCombiner.combineLargeBlocks(readers,
maxBlockSize);
-      for (BlocksCombiner.SmallBlocksUnion smallBlocks : largeBlocks) {
-        for (int columnIndex = 0; columnIndex < schema.getColumns().size(); columnIndex++)
{
-          ColumnDescriptor path = schema.getColumns().get(columnIndex);
-          ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor, schema,
allocator, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
-          ColumnWriteStoreV1 columnWriteStoreV1 = new ColumnWriteStoreV1(schema, store, ParquetProperties.builder().build());
-          for (BlocksCombiner.SmallBlock smallBlock : smallBlocks.getBlocks()) {
-            ParquetFileReader parquetFileReader = smallBlock.getReader();
-            try {
-              Optional<PageReader> columnChunkPageReader = parquetFileReader.readColumnInBlock(smallBlock.getBlockIndex(),
path);
-              ColumnWriter columnWriter = columnWriteStoreV1.getColumnWriter(path);
-              if (columnChunkPageReader.isPresent()) {
-                ColumnReader columnReader = columnReadStore.newMemColumnReader(path, columnChunkPageReader.get());
-                for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
-                  consumeTriplet(columnWriteStoreV1, columnWriter, columnReader);
-                }
-              } else {
-                MessageType inputFileSchema = parquetFileReader.getFileMetaData().getSchema();
-                String[] parentPath = getExisingParentPath(path, inputFileSchema);
-                int def = parquetFileReader.getFileMetaData().getSchema().getMaxDefinitionLevel(parentPath);
-                int rep = parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath);
-                for (int i = 0; i < parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount();
i++) {
-                  columnWriter.writeNull(rep, def);
-                  if (def == 0) {
-                    // V1 pages also respect record boundaries so we have to mark them
-                    columnWriteStoreV1.endRecord();
-                  }
-                }
-              }
-            } catch (Exception e) {
-              LOG.error("File {} is not readable", parquetFileReader.getFile(), e);
-            }
-          }
-          if (columnIndex == 0) {
-            this.startBlock(smallBlocks.getRowCount());
-          }
-          columnWriteStoreV1.flush();
-          store.flushToFileWriter(path, this);
-        }
-        this.endBlock();
-      }
-      this.end(Collections.emptyMap());
-    }finally {
-      BlocksCombiner.closeReaders(readers);
-    }
-    return 0;
-  }
-
-  private String[] getExisingParentPath(ColumnDescriptor path, MessageType inputFileSchema)
{
-    List<String> parentPath = Arrays.asList(path.getPath());
-    while (parentPath.size() > 0 && !inputFileSchema.containsPath(parentPath.toArray(new
String[parentPath.size()]))) {
-      parentPath = parentPath.subList(0, parentPath.size() - 1);
-    }
-    return parentPath.toArray(new String[parentPath.size()]);
-  }
-
-  private List<ParquetFileReader> getReaders(List<InputFile> inputFiles) throws
IOException {
-    List<ParquetFileReader> readers = new ArrayList<>(inputFiles.size());
-    for (InputFile inputFile : inputFiles) {
-      readers.add(ParquetFileReader.open(inputFile));
-    }
-    return readers;
-  }
-
-  private void consumeTriplet(ColumnWriteStore columnWriteStore, ColumnWriter columnWriter,
ColumnReader columnReader) {
-    int definitionLevel = columnReader.getCurrentDefinitionLevel();
-    int repetitionLevel = columnReader.getCurrentRepetitionLevel();
-    ColumnDescriptor column = columnReader.getDescriptor();
-    PrimitiveType type = column.getPrimitiveType();
-    if (definitionLevel < column.getMaxDefinitionLevel()) {
-      columnWriter.writeNull(repetitionLevel, definitionLevel);
-    } else {
-      switch (type.getPrimitiveTypeName()) {
-        case INT32:
-          columnWriter.write(columnReader.getInteger(), repetitionLevel, definitionLevel);
-          break;
-        case INT64:
-          columnWriter.write(columnReader.getLong(), repetitionLevel, definitionLevel);
-          break;
-        case BINARY:
-        case FIXED_LEN_BYTE_ARRAY:
-        case INT96:
-          columnWriter.write(columnReader.getBinary(), repetitionLevel, definitionLevel);
-          break;
-        case BOOLEAN:
-          columnWriter.write(columnReader.getBoolean(), repetitionLevel, definitionLevel);
-          break;
-        case FLOAT:
-          columnWriter.write(columnReader.getFloat(), repetitionLevel, definitionLevel);
-          break;
-        case DOUBLE:
-          columnWriter.write(columnReader.getDouble(), repetitionLevel, definitionLevel);
-          break;
-        default:
-          throw new IllegalArgumentException("Unknown primitive type " + type);
-      }
-    }
-    columnReader.consume();
-    if (repetitionLevel == 0) {
-      // V1 pages also respect record boundaries so we have to mark them
-      columnWriteStore.endRecord();
-    }
-  }
-
   /**
    * @param file a file stream to read from
    * @param rowGroups row groups to copy
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java
deleted file mode 100644
index 02dadc7..0000000
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.util;
-
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static java.util.Collections.unmodifiableList;
-
-public class BlocksCombiner {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BlocksCombiner.class);
-
-  public static List<SmallBlocksUnion> combineLargeBlocks(List<ParquetFileReader>
readers, long maxBlockSize) {
-    List<SmallBlocksUnion> blocks = new ArrayList<>();
-    long largeBlockSize = 0;
-    long largeBlockRecords = 0;
-    List<SmallBlock> smallBlocks = new ArrayList<>();
-    for (ParquetFileReader reader : readers) {
-      for (int blockIndex = 0; blockIndex < reader.blocksCount(); blockIndex++) {
-        BlockMetaData block = reader.getBlockMetaData(blockIndex);
-        if (!smallBlocks.isEmpty() && largeBlockSize + block.getTotalByteSize() >
maxBlockSize) {
-          blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
-          smallBlocks = new ArrayList<>();
-          largeBlockSize = 0;
-          largeBlockRecords = 0;
-        }
-        largeBlockSize += block.getTotalByteSize();
-        largeBlockRecords += block.getRowCount();
-        smallBlocks.add(new SmallBlock(reader, blockIndex));
-      }
-    }
-    if (!smallBlocks.isEmpty()) {
-      blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
-    }
-    return unmodifiableList(blocks);
-  }
-
-  public static void closeReaders(List<ParquetFileReader> readers) {
-    readers.forEach(r -> {
-      try {
-        r.close();
-      } catch (IOException e) {
-        LOG.error("Error closing reader {}", r.getFile(), e);
-      }
-    });
-  }
-
-  public static class SmallBlocksUnion {
-    private final List<SmallBlock> blocks;
-    private final long rowCount;
-
-    public SmallBlocksUnion(List<SmallBlock> blocks, long rowCount) {
-      this.blocks = blocks;
-      this.rowCount = rowCount;
-    }
-
-    public List<SmallBlock> getBlocks() {
-      return blocks;
-    }
-
-    public long getRowCount() {
-      return rowCount;
-    }
-  }
-
-  public static class SmallBlock {
-    private final ParquetFileReader reader;
-    private final int blockIndex;
-
-    public SmallBlock(ParquetFileReader reader, int blockIndex) {
-      this.reader = reader;
-      this.blockIndex = blockIndex;
-    }
-
-    public ParquetFileReader getReader() {
-      return reader;
-    }
-
-    public int getBlockIndex() {
-      return blockIndex;
-    }
-  }
-}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java
deleted file mode 100644
index a972238..0000000
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.example.GroupReadSupport;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.io.InputFile;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Types;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import static java.util.Arrays.asList;
-import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
-import static org.apache.parquet.schema.OriginalType.UTF8;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
-
-public class TestParquetWriterMergeBlocks {
-
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
-
-  public static final int FILE_SIZE = 10000;
-  public static final Configuration CONF = new Configuration();
-  public static final Map<String, String> EMPTY_METADATA =
-    new HashMap<String, String>();
-  public static final MessageType FILE_SCHEMA = Types.buildMessage()
-    .required(INT32).named("id")
-    .required(BINARY).as(UTF8).named("string")
-    .named("AppendTest");
-  public static final SimpleGroupFactory GROUP_FACTORY =
-    new SimpleGroupFactory(FILE_SCHEMA);
-
-  public Path file1;
-  public List<Group> file1content = new ArrayList<Group>();
-  public Path file2;
-  public List<Group> file2content = new ArrayList<Group>();
-
-  @Before
-  public void createSourceData() throws IOException {
-    this.file1 = newTemp();
-    this.file2 = newTemp();
-
-    ParquetWriter<Group> writer1 = ExampleParquetWriter.builder(file1)
-      .withType(FILE_SCHEMA)
-      .build();
-    ParquetWriter<Group> writer2 = ExampleParquetWriter.builder(file2)
-      .withType(FILE_SCHEMA)
-      .build();
-
-    for (int i = 0; i < FILE_SIZE; i += 1) {
-      Group group1 = GROUP_FACTORY.newGroup();
-      group1.add("id", i);
-      group1.add("string", UUID.randomUUID().toString());
-      writer1.write(group1);
-      file1content.add(group1);
-
-      Group group2 = GROUP_FACTORY.newGroup();
-      group2.add("id", FILE_SIZE+i);
-      group2.add("string", UUID.randomUUID().toString());
-      writer2.write(group2);
-      file2content.add(group2);
-    }
-
-    writer1.close();
-    writer2.close();
-  }
-
-  @Test
-  public void testBasicBehavior() throws IOException {
-    Path combinedFile = newTemp();
-    ParquetFileWriter writer = new ParquetFileWriter(
-      CONF, FILE_SCHEMA, combinedFile);
-
-    // Merge schema and extraMeta
-    List<Path> inputFiles = asList(file1, file2);
-    FileMetaData mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles, CONF).getFileMetaData();
-    List<InputFile> inputFileList = toInputFiles(inputFiles);
-    CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(CompressionCodecName.SNAPPY);
-
-    writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), 128 * 1024 * 1024);
-
-    LinkedList<Group> expected = new LinkedList<>();
-    expected.addAll(file1content);
-    expected.addAll(file2content);
-
-    ParquetReader<Group> reader = ParquetReader
-      .builder(new GroupReadSupport(), combinedFile)
-      .build();
-
-    Group next;
-    while ((next = reader.read()) != null) {
-      Group expectedNext = expected.removeFirst();
-      // check each value; equals is not supported for simple records
-      Assert.assertEquals("Each id should match",
-        expectedNext.getInteger("id", 0), next.getInteger("id", 0));
-      Assert.assertEquals("Each string should match",
-        expectedNext.getString("string", 0), next.getString("string", 0));
-    }
-
-    Assert.assertEquals("All records should be present", 0, expected.size());
-  }
-
-  private List<InputFile> toInputFiles(List<Path> inputFiles) {
-    return inputFiles.stream()
-      .map(input -> {
-        try {
-          return HadoopInputFile.fromPath(input, CONF);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }).collect(Collectors.toList());
-  }
-
-  @Test
-  public void testMergedMetadata() throws IOException {
-    Path combinedFile = newTemp();
-    ParquetFileWriter writer = new ParquetFileWriter(
-      CONF, FILE_SCHEMA, combinedFile);
-
-    // Merge schema and extraMeta
-    List<Path> inputFiles = asList(file1, file2);
-    FileMetaData mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles, CONF).getFileMetaData();
-    List<InputFile> inputFileList = toInputFiles(inputFiles);
-    CompressionCodecName codecName = CompressionCodecName.GZIP;
-    CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(codecName);
-    writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), 128 * 1024 * 1024);
-
-    ParquetMetadata combinedFooter = ParquetFileReader.readFooter(
-      CONF, combinedFile, NO_FILTER);
-    ParquetMetadata f1Footer = ParquetFileReader.readFooter(
-      CONF, file1, NO_FILTER);
-    ParquetMetadata f2Footer = ParquetFileReader.readFooter(
-      CONF, file2, NO_FILTER);
-
-    LinkedList<BlockMetaData> expectedRowGroups = new LinkedList<>();
-    expectedRowGroups.addAll(f1Footer.getBlocks());
-    expectedRowGroups.addAll(f2Footer.getBlocks());
-    long totalRowCount = expectedRowGroups.stream().mapToLong(BlockMetaData::getRowCount).sum();
-    Assert.assertEquals("Combined should have a single row group",
-      1,
-      combinedFooter.getBlocks().size());
-
-    BlockMetaData rowGroup = combinedFooter.getBlocks().get(0);
-    Assert.assertEquals("Row count should match",
-      totalRowCount, rowGroup.getRowCount());
-    assertColumnsEquivalent(f1Footer.getBlocks().get(0).getColumns(), rowGroup.getColumns(),
codecName);
-  }
-
-  public void assertColumnsEquivalent(List<ColumnChunkMetaData> expected,
-                                      List<ColumnChunkMetaData> actual,
-                                      CompressionCodecName codecName) {
-    Assert.assertEquals("Should have the expected columns",
-      expected.size(), actual.size());
-    for (int i = 0; i < actual.size(); i += 1) {
-      long numNulls = 0;
-      long valueCount = 0;
-      ColumnChunkMetaData current = actual.get(i);
-      Statistics statistics = current.getStatistics();
-      numNulls += statistics.getNumNulls();
-      valueCount += current.getValueCount();
-      if (i != 0) {
-        ColumnChunkMetaData previous = actual.get(i - 1);
-        long expectedStart = previous.getStartingPos() + previous.getTotalSize();
-        Assert.assertEquals("Should start after the previous column",
-          expectedStart, current.getStartingPos());
-      }
-
-      assertColumnMetadataEquivalent(expected.get(i), current, codecName, numNulls, valueCount);
-    }
-  }
-
-  public void assertColumnMetadataEquivalent(ColumnChunkMetaData expected,
-                                             ColumnChunkMetaData actual,
-                                             CompressionCodecName codecName,
-                                             long numNulls,
-                                             long valueCount) {
-    Assert.assertEquals("Should be the expected column",
-      expected.getPath(), expected.getPath());
-    Assert.assertEquals("Primitive type should not change",
-      expected.getType(), actual.getType());
-    Assert.assertEquals("Compression codec should not change",
-      codecName, actual.getCodec());
-    Assert.assertEquals("Data encodings should not change",
-      expected.getEncodings(), actual.getEncodings());
-    Assert.assertEquals("Statistics should not change",
-      numNulls, actual.getStatistics().getNumNulls());
-    Assert.assertEquals("Number of values should not change",
-      valueCount, actual.getValueCount());
-
-  }
-
-  @Test
-  public void testAllowDroppingColumns() throws IOException {
-    MessageType droppedColumnSchema = Types.buildMessage()
-      .required(BINARY).as(UTF8).named("string")
-      .named("AppendTest");
-
-    Path droppedColumnFile = newTemp();
-    List<Path> inputFiles = asList(file1, file2);
-    ParquetFileWriter writer = new ParquetFileWriter(
-      CONF, droppedColumnSchema, droppedColumnFile);
-    List<InputFile> inputFileList = toInputFiles(inputFiles);
-    CompressionCodecName codecName = CompressionCodecName.GZIP;
-    CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(codecName);
-    writer.merge(inputFileList, compressor, "", 128*1024*1024);
-
-    LinkedList<Group> expected = new LinkedList<Group>();
-    expected.addAll(file1content);
-    expected.addAll(file2content);
-
-    ParquetMetadata footer = ParquetFileReader.readFooter(
-      CONF, droppedColumnFile, NO_FILTER);
-    for (BlockMetaData rowGroup : footer.getBlocks()) {
-      Assert.assertEquals("Should have only the string column",
-        1, rowGroup.getColumns().size());
-    }
-
-    ParquetReader<Group> reader = ParquetReader
-      .builder(new GroupReadSupport(), droppedColumnFile)
-      .build();
-
-    Group next;
-    while ((next = reader.read()) != null) {
-      Group expectedNext = expected.removeFirst();
-      Assert.assertEquals("Each string should match",
-        expectedNext.getString("string", 0), next.getString("string", 0));
-    }
-
-    Assert.assertEquals("All records should be present", 0, expected.size());
-  }
-
-  private Path newTemp() throws IOException {
-    File file = temp.newFile();
-    Preconditions.checkArgument(file.delete(), "Could not remove temp file");
-    return new Path(file.toString());
-  }
-}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
index 6d5b313..fe64587 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
@@ -19,29 +19,20 @@
 package org.apache.parquet.tools.command;
 
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.io.InputFile;
 import org.apache.parquet.tools.Main;
 
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
 
 public class MergeCommand extends ArgsOnlyCommand {
   public static final String[] USAGE = new String[] {
@@ -58,32 +49,6 @@ public class MergeCommand extends ArgsOnlyCommand {
 
   private Configuration conf;
 
-  private static final Options OPTIONS;
-  static {
-    OPTIONS = new Options();
-
-    Option block = Option.builder("b")
-      .longOpt("block")
-      .desc("Merge adjacent blocks into one up to upper bound size limit default to 128 MB")
-      .build();
-
-    Option limit = Option.builder("l")
-      .longOpt("limit")
-      .desc("Upper bound for merged block size in megabytes. Default: 128 MB")
-      .hasArg()
-      .build();
-
-    Option codec = Option.builder("c")
-      .longOpt("codec")
-      .desc("Compression codec name. Default: SNAPPY. Valid values: UNCOMPRESSED, SNAPPY,
GZIP, LZO, BROTLI, LZ4, ZSTD")
-      .hasArg()
-      .build();
-
-    OPTIONS.addOption(limit);
-    OPTIONS.addOption(block);
-    OPTIONS.addOption(codec);
-  }
-
   public MergeCommand() {
     super(2, MAX_FILE_NUM + 1);
 
@@ -91,11 +56,6 @@ public class MergeCommand extends ArgsOnlyCommand {
   }
 
   @Override
-  public Options getOptions() {
-    return OPTIONS;
-  }
-
-  @Override
   public String[] getUsageDescription() {
     return USAGE;
   }
@@ -103,32 +63,18 @@ public class MergeCommand extends ArgsOnlyCommand {
   @Override
   public String getCommandDescription() {
     return "Merges multiple Parquet files into one. " +
-      "Without -b option the command doesn't merge row groups, just places one after the
other. " +
+      "The command doesn't merge row groups, just places one after the other. " +
       "When used to merge many small files, the resulting file will still contain small row
groups, " +
-      "which usually leads to bad query performance. " +
-      "To have adjacent small blocks merged together use -b option. " +
-      "Blocks will be grouped into larger one until the upper bound is reached. " +
-      "Default block upper bound 128 MB and default compression SNAPPY can be customized
using -l and -c options";
+      "which usually leads to bad query performance.";
   }
 
   @Override
   public void execute(CommandLine options) throws Exception {
-    boolean mergeBlocks = options.hasOption('b');
-    int maxBlockSize = options.hasOption('l')? Integer.parseInt(options.getOptionValue('l'))
* 1024 * 1024 : DEFAULT_BLOCK_SIZE;
-    CompressionCodecName compressionCodec = options.hasOption('c') ? CompressionCodecName.valueOf(options.getOptionValue('c'))
: CompressionCodecName.SNAPPY;
     // Prepare arguments
     List<String> args = options.getArgList();
     List<Path> inputFiles = getInputFiles(args.subList(0, args.size() - 1));
     Path outputFile = new Path(args.get(args.size() - 1));
-    if (mergeBlocks) {
-      CodecFactory.BytesCompressor compressor = new CodecFactory(conf, DEFAULT_PAGE_SIZE).getCompressor(compressionCodec);
-      mergeBlocks(maxBlockSize, compressor, inputFiles, outputFile);
-    } else {
-      mergeFiles(inputFiles, outputFile);
-    }
-  }
 
-  private void mergeFiles(List<Path> inputFiles, Path outputFile) throws IOException
{
     // Merge schema and extraMeta
     FileMetaData mergedMeta = mergedMetadata(inputFiles);
     PrintWriter out = new PrintWriter(Main.out, true);
@@ -157,23 +103,6 @@ public class MergeCommand extends ArgsOnlyCommand {
     writer.end(mergedMeta.getKeyValueMetaData());
   }
 
-  private void mergeBlocks(int maxBlockSize, CodecFactory.BytesCompressor compressor, List<Path>
inputFiles, Path outputFile) throws IOException {
-    // Merge schema and extraMeta
-    FileMetaData mergedMeta = mergedMetadata(inputFiles);
-
-    // Merge data
-    ParquetFileWriter writer = new ParquetFileWriter(conf, mergedMeta.getSchema(), outputFile,
ParquetFileWriter.Mode.CREATE);
-    List<InputFile> inputFileList = inputFiles.stream()
-      .map(input -> {
-        try {
-          return HadoopInputFile.fromPath(input, conf);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }).collect(Collectors.toList());
-    writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), maxBlockSize);
-  }
-
   private FileMetaData mergedMetadata(List<Path> inputFiles) throws IOException {
     return ParquetFileWriter.mergeMetadataFiles(inputFiles, conf).getFileMetaData();
   }


Mime
View raw message