parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject [parquet-mr] 01/02: Merge branch 'master' into column-indexes
Date Fri, 28 Sep 2018 07:52:19 GMT
This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git

commit 85e699c8b78ea1b50a1bd0e02d88acea15bd293a
Merge: 55d791c a150f24
Author: Gabor Szadovszky <gabor@apache.org>
AuthorDate: Fri Sep 28 09:25:44 2018 +0200

    Merge branch 'master' into column-indexes

 .travis.yml                                        |   2 +-
 README.md                                          |  92 +-
 dev/README.md                                      |   4 +-
 dev/source-release.sh                              |   3 +-
 parquet-arrow/pom.xml                              |   2 +-
 .../parquet/arrow/schema/SchemaConverter.java      | 260 +++---
 .../parquet/arrow/schema/TestSchemaConverter.java  |  27 +-
 parquet-avro/pom.xml                               |   4 +-
 .../apache/parquet/avro/AvroSchemaConverter.java   | 176 ++--
 .../parquet/avro/TestAvroSchemaConverter.java      |  14 +-
 .../org/apache/parquet/avro/TestReadWrite.java     |  31 +
 .../parquet/cascading/convert/TupleConverter.java  |   9 +-
 .../parquet/cascading/TestParquetTBaseScheme.java  |   7 +-
 .../src/main/java/org/apache/parquet/cli/Util.java |  10 +
 .../cli/commands/ParquetMetadataCommand.java       |   4 +-
 .../cli/commands/ShowDictionaryCommand.java        |   4 +-
 .../parquet/cli/commands/ShowPagesCommand.java     |   4 +-
 .../parquet/column/impl/ColumnReadStoreImpl.java   |   5 +
 .../apache/parquet/column/values/ValuesReader.java |  70 ++
 .../values/bitpacking/BitPackingValuesReader.java  |   1 +
 .../bitpacking/ByteBitPackingValuesReader.java     |   1 +
 .../delta/DeltaBinaryPackingValuesReader.java      |   2 +
 .../values/plain/BooleanPlainValuesReader.java     |   6 +
 .../rle/RunLengthBitPackingHybridValuesReader.java |   3 +
 .../column/values/rle/ZeroIntegerValuesReader.java |   1 +
 .../parquet/filter2/predicate/ValidTypeMap.java    |   7 +-
 .../apache/parquet/schema/ConversionPatterns.java  |  28 +-
 .../java/org/apache/parquet/schema/GroupType.java  |  53 +-
 .../parquet/schema/LogicalTypeAnnotation.java      | 983 +++++++++++++++++++++
 .../org/apache/parquet/schema/MessageType.java     |   8 +-
 .../apache/parquet/schema/MessageTypeParser.java   |  55 +-
 .../org/apache/parquet/schema/OriginalType.java    |  66 +-
 .../apache/parquet/schema/PrimitiveComparator.java |  10 +-
 .../org/apache/parquet/schema/PrimitiveType.java   | 269 +++---
 .../main/java/org/apache/parquet/schema/Type.java  |  40 +-
 .../main/java/org/apache/parquet/schema/Types.java | 213 ++++-
 ...ltaBinaryPackingValuesWriterForIntegerTest.java |   8 +
 .../DeltaBinaryPackingValuesWriterForLongTest.java |   8 +
 .../column/values/dictionary/TestDictionary.java   |   5 +
 .../filter2/predicate/TestValidTypeMap.java        |   7 +-
 .../apache/parquet/parser/TestParquetParser.java   |  72 +-
 .../org/apache/parquet/schema/TestMessageType.java |   2 +-
 .../parquet/schema/TestPrimitiveComparator.java    |  19 +
 .../apache/parquet/schema/TestTypeBuilders.java    |  76 +-
 parquet-common/pom.xml                             |   4 +-
 .../parquet/bytes/ByteBufferInputStream.java       | 100 ++-
 .../java/org/apache/parquet/bytes/BytesInput.java  |  16 +
 .../parquet/bytes/MultiBufferInputStream.java      |   2 +-
 .../parquet/bytes/TestByteBufferInputStreams.java  |  14 +
 ...m.java => TestDeprecatedBufferInputStream.java} |  94 +-
 .../parquet/bytes/TestSingleBufferInputStream.java |   2 +-
 parquet-format-structures/pom.xml                  | 206 +++++
 .../apache/parquet/format/InterningProtocol.java   | 231 +++++
 .../org/apache/parquet/format/LogicalTypes.java    |  55 ++
 .../main/java/org/apache/parquet/format/Util.java  | 236 +++++
 .../org/apache/parquet/format/event/Consumers.java | 193 ++++
 .../format/event/EventBasedThriftReader.java       | 126 +++
 .../apache/parquet/format/event/FieldConsumer.java |  45 +-
 .../apache/parquet/format/event/TypedConsumer.java | 205 +++++
 .../java/org/apache/parquet/format/TestUtil.java   |  83 ++
 parquet-hadoop/pom.xml                             |   4 +-
 .../java/org/apache/parquet/HadoopReadOptions.java |   4 +-
 .../format/converter/ParquetMetadataConverter.java | 482 +++++++---
 .../parquet/hadoop/ColumnChunkPageWriteStore.java  |   5 +
 .../hadoop/InternalParquetRecordWriter.java        |   4 +-
 .../apache/parquet/hadoop/ParquetFileReader.java   | 100 ++-
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 114 +++
 .../parquet/hadoop/metadata/ParquetMetadata.java   |  15 +-
 .../apache/parquet/hadoop/util/BlocksCombiner.java | 106 +++
 .../converter/TestParquetMetadataConverter.java    |  95 +-
 .../hadoop/TestParquetWriterMergeBlocks.java       | 280 ++++++
 .../apache/parquet/statistics/RandomValues.java    |   7 +-
 .../ql/io/parquet/convert/HiveSchemaConverter.java |  17 +-
 parquet-pig/pom.xml                                |   4 +-
 .../org/apache/parquet/pig/PigSchemaConverter.java | 130 +--
 .../apache/parquet/pig/convert/TupleConverter.java |  31 +-
 parquet-protobuf/pom.xml                           |  11 +
 .../parquet/proto/ProtoMessageConverter.java       |  43 +-
 .../apache/parquet/proto/ProtoSchemaConverter.java |  45 +-
 .../apache/parquet/proto/ProtoWriteSupport.java    |  29 +-
 parquet-thrift/pom.xml                             |  11 +
 .../parquet/thrift/ThriftSchemaConvertVisitor.java |  18 +-
 parquet-tools/pom.xml                              |   4 +-
 .../apache/parquet/tools/command/DumpCommand.java  |   1 -
 .../apache/parquet/tools/command/MergeCommand.java |  75 +-
 .../tools/{util => command}/MetadataUtils.java     |  93 +-
 .../parquet/tools/command/ShowMetaCommand.java     |  29 +-
 .../parquet/tools/command/ShowSchemaCommand.java   |  14 +-
 .../parquet/tools/read/SimpleRecordConverter.java  |  56 +-
 .../apache/parquet/tools/util/MetadataUtils.java   |   9 +-
 pom.xml                                            |   8 +
 91 files changed, 5100 insertions(+), 1027 deletions(-)

diff --cc parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
index 8066564,e582908..6fa39ec
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@@ -72,15 -72,14 +72,20 @@@ public class ColumnReadStoreImpl implem
  
    @Override
    public ColumnReader getColumnReader(ColumnDescriptor path) {
 -    return newMemColumnReader(path, pageReadStore.getPageReader(path));
 +    PrimitiveConverter converter = getPrimitiveConverter(path);
 +    PageReader pageReader = pageReadStore.getPageReader(path);
 +    if (pageReadStore.isInPageFilteringMode()) {
 +      return new SynchronizingColumnReader(path, pageReader, converter, writerVersion, pageReadStore.getRowIndexes());
 +    } else {
 +      return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
 +    }
    }
  
+   public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader)
{
+     PrimitiveConverter converter = getPrimitiveConverter(path);
+     return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
+   }
+ 
    private PrimitiveConverter getPrimitiveConverter(ColumnDescriptor path) {
      Type currentType = schema;
      Converter currentConverter = recordConverter;
diff --cc parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index 5f75460,b8f481e..4f5c78a
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@@ -82,10 -80,9 +82,10 @@@ public class HadoopReadOptions extends 
      public Builder(Configuration conf) {
        this.conf = conf;
        useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", false));
-       useDictionaryFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
-       useStatsFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
+       useDictionaryFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
+       useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
        useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
 +      useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true));
        withCodecFactory(HadoopCodecs.newFactory(conf, 0));
        withRecordFilter(getFilter(conf));
        withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
diff --cc parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 6fce6f2,9478e94..484a505
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@@ -39,12 -41,26 +41,28 @@@ import java.util.concurrent.ConcurrentH
  import org.apache.hadoop.conf.Configuration;
  import org.apache.parquet.CorruptStatistics;
  import org.apache.parquet.ParquetReadOptions;
+ import org.apache.parquet.format.BsonType;
  import org.apache.parquet.format.CompressionCodec;
+ import org.apache.parquet.format.DateType;
+ import org.apache.parquet.format.DecimalType;
+ import org.apache.parquet.format.EnumType;
+ import org.apache.parquet.format.IntType;
+ import org.apache.parquet.format.JsonType;
+ import org.apache.parquet.format.ListType;
+ import org.apache.parquet.format.LogicalType;
+ import org.apache.parquet.format.MapType;
+ import org.apache.parquet.format.MicroSeconds;
+ import org.apache.parquet.format.MilliSeconds;
+ import org.apache.parquet.format.NullType;
  import org.apache.parquet.format.PageEncodingStats;
+ import org.apache.parquet.format.StringType;
+ import org.apache.parquet.format.TimeType;
+ import org.apache.parquet.format.TimeUnit;
+ import org.apache.parquet.format.TimestampType;
  import org.apache.parquet.hadoop.metadata.ColumnPath;
 +import org.apache.parquet.format.BoundaryOrder;
  import org.apache.parquet.format.ColumnChunk;
 +import org.apache.parquet.format.ColumnIndex;
  import org.apache.parquet.format.ColumnMetaData;
  import org.apache.parquet.format.ColumnOrder;
  import org.apache.parquet.format.ConvertedType;
diff --cc parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index add0e09,527c831..5352309
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@@ -65,11 -68,8 +66,12 @@@ import org.apache.parquet.column.page.D
  import org.apache.parquet.column.page.DataPageV1;
  import org.apache.parquet.column.page.DataPageV2;
  import org.apache.parquet.column.page.DictionaryPage;
 +import org.apache.parquet.column.page.DictionaryPageReadStore;
++import org.apache.parquet.column.page.PageReader;
  import org.apache.parquet.column.page.PageReadStore;
 -import org.apache.parquet.hadoop.metadata.ColumnPath;
 +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 +import org.apache.parquet.filter2.compat.FilterCompat;
 +import org.apache.parquet.filter2.compat.RowGroupFilter;
  import org.apache.parquet.format.DataPageHeader;
  import org.apache.parquet.format.DataPageHeaderV2;
  import org.apache.parquet.format.DictionaryPageHeader;
@@@ -702,11 -688,16 +704,18 @@@ public class ParquetFileReader implemen
      this.file = file;
      this.f = file.newStream();
      this.options = options;
-     this.footer = readFooter(file, options, f, converter);
+     try {
+       this.footer = readFooter(file, options, f, converter);
+     } catch (Exception e) {
+       // In case that reading footer throws an exception in the constructor, the new stream
+       // should be closed. Otherwise, there's no way to close this outside.
+       f.close();
+       throw e;
+     }
      this.fileMetaData = footer.getFileMetaData();
      this.blocks = filterRowGroups(footer.getBlocks());
 +    this.blockIndexStores = listWithNulls(this.blocks.size());
 +    this.blockRowRanges = listWithNulls(this.blocks.size());
      for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
        paths.put(ColumnPath.get(col.getPath()), col);
      }
@@@ -1395,30 -1158,12 +1404,11 @@@
  
      /**
       * @param f file to read the chunks from
 -     * @return the chunks
 +     * @param builder used to build chunk list to read the pages for the different columns
       * @throws IOException if there is an error while reading from the stream
       */
 -    public List<Chunk> readAll(SeekableInputStream f) throws IOException {
 -      List<Chunk> result = new ArrayList<>(chunks.size());
 +    public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException
{
-       f.seek(offset);
- 
-       int fullAllocations = length / options.getMaxAllocationSize();
-       int lastAllocationSize = length % options.getMaxAllocationSize();
- 
-       int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
-       List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
- 
-       for (int i = 0; i < fullAllocations; i += 1) {
-         buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
-       }
- 
-       if (lastAllocationSize > 0) {
-         buffers.add(options.getAllocator().allocate(lastAllocationSize));
-       }
- 
-       for (ByteBuffer buffer : buffers) {
-         f.readFully(buffer);
-         buffer.flip();
-       }
+       List<ByteBuffer> buffers = readBlocks(f, offset, length);
  
        // report in a counter the data we just scanned
        BenchmarkCounter.incrementBytesRead(length);
@@@ -1438,4 -1189,72 +1428,72 @@@
  
    }
  
+   /**
+    * @param f file to read the blocks from
+    * @return the ByteBuffer blocks
+    * @throws IOException if there is an error while reading from the stream
+    */
+   List<ByteBuffer> readBlocks(SeekableInputStream f, long offset, int length) throws
IOException {
+     f.seek(offset);
+ 
+     int fullAllocations = length / options.getMaxAllocationSize();
+     int lastAllocationSize = length % options.getMaxAllocationSize();
+ 
+     int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
+     List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
+ 
+     for (int i = 0; i < fullAllocations; i++) {
+       buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
+     }
+ 
+     if (lastAllocationSize > 0) {
+       buffers.add(options.getAllocator().allocate(lastAllocationSize));
+     }
+ 
+     for (ByteBuffer buffer : buffers) {
+       f.readFully(buffer);
+       buffer.flip();
+     }
+     return buffers;
+   }
+ 
+   Optional<PageReader> readColumnInBlock(int blockIndex, ColumnDescriptor columnDescriptor)
{
+     BlockMetaData block = blocks.get(blockIndex);
+     if (block.getRowCount() == 0) {
+       throw new RuntimeException("Illegal row group of 0 rows");
+     }
+     Optional<ColumnChunkMetaData> mc = findColumnByPath(block, columnDescriptor.getPath());
+ 
+     return mc.map(column -> new ChunkDescriptor(columnDescriptor, column, column.getStartingPos(),
(int) column.getTotalSize()))
+       .map(chunk -> readChunk(f, chunk));
+   }
+ 
+   private ColumnChunkPageReader readChunk(SeekableInputStream f, ChunkDescriptor descriptor)
{
+     try {
+       List<ByteBuffer> buffers = readBlocks(f, descriptor.fileOffset, descriptor.size);
+       ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
 -      Chunk chunk = new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size),
f);
++      Chunk chunk = new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size),
f, null);
+       return chunk.readAllPages();
+     } catch (IOException e) {
+       throw new RuntimeException(e);
+     }
+   }
+ 
+   private Optional<ColumnChunkMetaData> findColumnByPath(BlockMetaData block, String[]
path) {
+     for (ColumnChunkMetaData column : block.getColumns()) {
+       if (Arrays.equals(column.getPath().toArray(), path)) {
+         return Optional.of(column);
+       }
+     }
+     return Optional.empty();
+   }
+ 
+   public int blocksCount() {
+     return blocks.size();
+   }
+ 
+   public BlockMetaData getBlockMetaData(int blockIndex) {
+     return blocks.get(blockIndex);
+   }
+ 
  }
diff --cc parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 3a65624,b944e97..7dd6e80
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@@ -48,11 -54,14 +55,15 @@@ import org.apache.parquet.column.Column
  import org.apache.parquet.column.Encoding;
  import org.apache.parquet.column.EncodingStats;
  import org.apache.parquet.column.ParquetProperties;
+ import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+ import org.apache.parquet.column.impl.ColumnWriteStoreV1;
  import org.apache.parquet.column.page.DictionaryPage;
+ import org.apache.parquet.column.page.PageReader;
  import org.apache.parquet.column.statistics.Statistics;
+ import org.apache.parquet.example.DummyRecordConverter;
  import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
  import org.apache.parquet.hadoop.metadata.ColumnPath;
 +import org.apache.parquet.format.Util;
  import org.apache.parquet.format.converter.ParquetMetadataConverter;
  import org.apache.parquet.hadoop.metadata.BlockMetaData;
  import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@@ -60,13 -69,9 +71,14 @@@ import org.apache.parquet.hadoop.metada
  import org.apache.parquet.hadoop.metadata.FileMetaData;
  import org.apache.parquet.hadoop.metadata.GlobalMetaData;
  import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+ import org.apache.parquet.hadoop.util.BlocksCombiner;
  import org.apache.parquet.hadoop.util.HadoopOutputFile;
  import org.apache.parquet.hadoop.util.HadoopStreams;
 +import org.apache.parquet.internal.column.columnindex.ColumnIndex;
 +import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
 +import org.apache.parquet.internal.column.columnindex.OffsetIndex;
 +import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
 +import org.apache.parquet.internal.hadoop.metadata.IndexReference;
  import org.apache.parquet.io.InputFile;
  import org.apache.parquet.io.OutputFile;
  import org.apache.parquet.io.SeekableInputStream;
@@@ -655,6 -532,108 +667,108 @@@ public class ParquetFileWriter 
      ParquetFileReader.open(file).appendTo(this);
    }
  
+   public int merge(List<InputFile> inputFiles, CodecFactory.BytesCompressor compressor,
String createdBy, long maxBlockSize) throws IOException {
+     List<ParquetFileReader> readers = getReaders(inputFiles);
+     try {
+       ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+       ColumnReadStoreImpl columnReadStore = new ColumnReadStoreImpl(null, new DummyRecordConverter(schema).getRootConverter(),
schema, createdBy);
+       this.start();
+       List<BlocksCombiner.SmallBlocksUnion> largeBlocks = BlocksCombiner.combineLargeBlocks(readers,
maxBlockSize);
+       for (BlocksCombiner.SmallBlocksUnion smallBlocks : largeBlocks) {
+         for (int columnIndex = 0; columnIndex < schema.getColumns().size(); columnIndex++)
{
+           ColumnDescriptor path = schema.getColumns().get(columnIndex);
 -          ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor, schema,
allocator);
 -          ColumnWriteStoreV1 columnWriteStoreV1 = new ColumnWriteStoreV1(store, ParquetProperties.builder().build());
++          ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor, schema,
allocator, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
++          ColumnWriteStoreV1 columnWriteStoreV1 = new ColumnWriteStoreV1(schema, store,
ParquetProperties.builder().build());
+           for (BlocksCombiner.SmallBlock smallBlock : smallBlocks.getBlocks()) {
+             ParquetFileReader parquetFileReader = smallBlock.getReader();
+             try {
+               Optional<PageReader> columnChunkPageReader = parquetFileReader.readColumnInBlock(smallBlock.getBlockIndex(),
path);
+               ColumnWriter columnWriter = columnWriteStoreV1.getColumnWriter(path);
+               if (columnChunkPageReader.isPresent()) {
+                 ColumnReader columnReader = columnReadStore.newMemColumnReader(path, columnChunkPageReader.get());
+                 for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
+                   consumeTriplet(columnWriter, columnReader);
+                 }
+               } else {
+                 MessageType inputFileSchema = parquetFileReader.getFileMetaData().getSchema();
+                 String[] parentPath = getExisingParentPath(path, inputFileSchema);
+                 int def = parquetFileReader.getFileMetaData().getSchema().getMaxDefinitionLevel(parentPath);
+                 int rep = parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath);
+                 for (int i = 0; i < parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount();
i++) {
+                   columnWriter.writeNull(rep, def);
+                 }
+               }
+             } catch (Exception e) {
+               LOG.error("File {} is not readable", parquetFileReader.getFile(), e);
+             }
+           }
+           if (columnIndex == 0) {
+             this.startBlock(smallBlocks.getRowCount());
+           }
+           columnWriteStoreV1.flush();
+           store.flushToFileWriter(path, this);
+         }
+         this.endBlock();
+       }
+       this.end(Collections.emptyMap());
+     }finally {
+       BlocksCombiner.closeReaders(readers);
+     }
+     return 0;
+   }
+ 
+   private String[] getExisingParentPath(ColumnDescriptor path, MessageType inputFileSchema)
{
+     List<String> parentPath = Arrays.asList(path.getPath());
+     while (parentPath.size() > 0 && !inputFileSchema.containsPath(parentPath.toArray(new
String[parentPath.size()]))) {
+       parentPath = parentPath.subList(0, parentPath.size() - 1);
+     }
+     return parentPath.toArray(new String[parentPath.size()]);
+   }
+ 
+   private List<ParquetFileReader> getReaders(List<InputFile> inputFiles) throws
IOException {
+     List<ParquetFileReader> readers = new ArrayList<>(inputFiles.size());
+     for (InputFile inputFile : inputFiles) {
+       readers.add(ParquetFileReader.open(inputFile));
+     }
+     return readers;
+   }
+ 
+   private void consumeTriplet(ColumnWriter columnWriter, ColumnReader columnReader) {
+     int definitionLevel = columnReader.getCurrentDefinitionLevel();
+     int repetitionLevel = columnReader.getCurrentRepetitionLevel();
+     ColumnDescriptor column = columnReader.getDescriptor();
+     PrimitiveType type = column.getPrimitiveType();
+     if (definitionLevel < column.getMaxDefinitionLevel()) {
+       columnWriter.writeNull(repetitionLevel, definitionLevel);
+     } else {
+       switch (type.getPrimitiveTypeName()) {
+         case INT32:
+           columnWriter.write(columnReader.getInteger(), repetitionLevel, definitionLevel);
+           break;
+         case INT64:
+           columnWriter.write(columnReader.getLong(), repetitionLevel, definitionLevel);
+           break;
+         case BINARY:
+         case FIXED_LEN_BYTE_ARRAY:
+         case INT96:
+           columnWriter.write(columnReader.getBinary(), repetitionLevel, definitionLevel);
+           break;
+         case BOOLEAN:
+           columnWriter.write(columnReader.getBoolean(), repetitionLevel, definitionLevel);
+           break;
+         case FLOAT:
+           columnWriter.write(columnReader.getFloat(), repetitionLevel, definitionLevel);
+           break;
+         case DOUBLE:
+           columnWriter.write(columnReader.getDouble(), repetitionLevel, definitionLevel);
+           break;
+         default:
+           throw new IllegalArgumentException("Unknown primitive type " + type);
+       }
+     }
+     columnReader.consume();
+   }
+ 
    /**
     * @param file a file stream to read from
     * @param rowGroups row groups to copy
diff --cc parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index 90f4a5b,d1a3a3c..b653b4b
--- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@@ -66,13 -69,9 +70,14 @@@ import org.apache.parquet.hadoop.metada
  import org.apache.parquet.hadoop.metadata.ColumnPath;
  import org.apache.parquet.hadoop.metadata.CompressionCodecName;
  import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 +import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
 +import org.apache.parquet.internal.column.columnindex.ColumnIndex;
 +import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
 +import org.apache.parquet.internal.column.columnindex.OffsetIndex;
 +import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
  import org.apache.parquet.io.api.Binary;
  import org.apache.parquet.schema.PrimitiveType;
+ import org.apache.parquet.schema.LogicalTypeAnnotation;
  import org.junit.Assert;
  import org.junit.Test;
  import org.apache.parquet.example.Paper;
diff --cc pom.xml
index dbd68bb,4c9d79c..5ddec43
--- a/pom.xml
+++ b/pom.xml
@@@ -81,9 -81,10 +81,10 @@@
      <hadoop1.version>1.2.1</hadoop1.version>
      <cascading.version>2.7.1</cascading.version>
      <cascading3.version>3.1.2</cascading3.version>
 -    <parquet.format.version>2.4.0</parquet.format.version>
 +    <parquet.format.version>2.5.0</parquet.format.version>
      <previous.version>1.7.0</previous.version>
      <thrift.executable>thrift</thrift.executable>
+     <format.thrift.executable>thrift</format.thrift.executable>
      <scala.version>2.10.6</scala.version>
      <!-- scala.binary.version is used for projects that fetch dependencies that are in
scala -->
      <scala.binary.version>2.10</scala.binary.version>


Mime
View raw message