parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject git commit: PARQUET-104: Fix writing empty row group at the end of the file
Date Mon, 29 Sep 2014 19:00:46 GMT
Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master bf20abbf4 -> 0b17cbee9


PARQUET-104: Fix writing empty row group at the end of the file

At then end of a parquet file, it may writes an empty rowgroup.
This happens when: numberOfRecords mod sizeOfRowGroup = 0

Author: Tianshuo Deng <tdeng@twitter.com>

Closes #66 from tsdeng/fix_empty_row_group and squashes the following commits:

10b93fb [Tianshuo Deng] rename
e3a5896 [Tianshuo Deng] format
91fa0d4 [Tianshuo Deng] fix empty row group


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/0b17cbee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/0b17cbee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/0b17cbee

Branch: refs/heads/master
Commit: 0b17cbee9541998df66d33c8a99b675ced80d9aa
Parents: bf20abb
Author: Tianshuo Deng <tdeng@twitter.com>
Authored: Mon Sep 29 12:00:03 2014 -0700
Committer: julien <julien@twitter.com>
Committed: Mon Sep 29 12:00:03 2014 -0700

----------------------------------------------------------------------
 .../hadoop/InternalParquetRecordWriter.java     | 64 +++++++++++---------
 1 file changed, 34 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/0b17cbee/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
index 727d21d..d73c811 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
@@ -42,11 +42,11 @@ class InternalParquetRecordWriter<T> {
   private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
 
-  private final ParquetFileWriter w;
+  private final ParquetFileWriter parquetFileWriter;
   private final WriteSupport<T> writeSupport;
   private final MessageType schema;
   private final Map<String, String> extraMetaData;
-  private final int blockSize;
+  private final int rowGroupSize;
   private final int pageSize;
   private final BytesCompressor compressor;
   private final int dictionaryPageSize;
@@ -57,34 +57,34 @@ class InternalParquetRecordWriter<T> {
   private long recordCount = 0;
   private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
 
-  private ColumnWriteStoreImpl store;
+  private ColumnWriteStoreImpl columnStore;
   private ColumnChunkPageWriteStore pageStore;
 
   /**
-   * @param w the file to write to
+   * @param parquetFileWriter the file to write to
    * @param writeSupport the class to convert incoming records
    * @param schema the schema of the records
    * @param extraMetaData extra meta data to write in the footer of the file
-   * @param blockSize the size of a block in the file (this will be approximate)
-   * @param codec the codec used to compress
+   * @param rowGroupSize the size of a block in the file (this will be approximate)
+   * @param compressor the codec used to compress
    */
   public InternalParquetRecordWriter(
-      ParquetFileWriter w,
+      ParquetFileWriter parquetFileWriter,
       WriteSupport<T> writeSupport,
       MessageType schema,
       Map<String, String> extraMetaData,
-      int blockSize,
+      int rowGroupSize,
       int pageSize,
       BytesCompressor compressor,
       int dictionaryPageSize,
       boolean enableDictionary,
       boolean validating,
       WriterVersion writerVersion) {
-    this.w = w;
+    this.parquetFileWriter = parquetFileWriter;
     this.writeSupport = checkNotNull(writeSupport, "writeSupport");
     this.schema = schema;
     this.extraMetaData = extraMetaData;
-    this.blockSize = blockSize;
+    this.rowGroupSize = rowGroupSize;
     this.pageSize = pageSize;
     this.compressor = compressor;
     this.dictionaryPageSize = dictionaryPageSize;
@@ -98,22 +98,22 @@ class InternalParquetRecordWriter<T> {
     // we don't want this number to be too small
     // ideally we divide the block equally across the columns
     // it is unlikely all columns are going to be the same size.
-    int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size()
/ 5);
+    int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size()
/ 5);
     pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize);
     // we don't want this number to be too small either
     // ideally, slightly bigger than the page size, but not bigger than the block buffer
     int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
-    store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize,
enableDictionary, writerVersion);
+    columnStore = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize,
enableDictionary, writerVersion);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
-    writeSupport.prepareForWrite(columnIO.getRecordWriter(store));
+    writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
   }
 
   public void close() throws IOException, InterruptedException {
-    flushStore();
+    flushRowGroupToStore();
     FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite();
     Map<String, String> finalMetadata = new HashMap<String, String>(extraMetaData);
     finalMetadata.putAll(finalWriteContext.getExtraMetaData());
-    w.end(finalMetadata);
+    parquetFileWriter.end(finalMetadata);
   }
 
   public void write(T value) throws IOException, InterruptedException {
@@ -124,16 +124,16 @@ class InternalParquetRecordWriter<T> {
 
   private void checkBlockSizeReached() throws IOException {
     if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively
expensive, so let's not do it for every record.
-      long memSize = store.memSize();
-      if (memSize > blockSize) {
-        LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize,
blockSize, recordCount));
-        flushStore();
+      long memSize = columnStore.memSize();
+      if (memSize > rowGroupSize) {
+        LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize,
rowGroupSize, recordCount));
+        flushRowGroupToStore();
         initStore();
         recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount
/ 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
       } else {
         float recordSize = (float) memSize / recordCount;
         recordCountForNextMemCheck = min(
-            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize))
/ 2), // will check halfway
+            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSize / recordSize))
/ 2), // will check halfway
             recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records
ahead
             );
         if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount,
recordCountForNextMemCheck));
@@ -141,18 +141,22 @@ class InternalParquetRecordWriter<T> {
     }
   }
 
-  private void flushStore()
+  private void flushRowGroupToStore()
       throws IOException {
-    LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
-    if (store.allocatedSize() > 3 * (long) blockSize) {
-      LOG.warn("Too much memory used: " + store.memUsageString());
+    LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.allocatedSize()));
+    if (columnStore.allocatedSize() > 3 * (long)rowGroupSize) {
+      LOG.warn("Too much memory used: " + columnStore.memUsageString());
     }
-    w.startBlock(recordCount);
-    store.flush();
-    pageStore.flushToFileWriter(w);
-    recordCount = 0;
-    w.endBlock();
-    store = null;
+
+    if (recordCount > 0) {
+      parquetFileWriter.startBlock(recordCount);
+      columnStore.flush();
+      pageStore.flushToFileWriter(parquetFileWriter);
+      recordCount = 0;
+      parquetFileWriter.endBlock();
+    }
+
+    columnStore = null;
     pageStore = null;
   }
 }
\ No newline at end of file


Mime
View raw message