Return-Path: X-Original-To: apmail-parquet-commits-archive@minotaur.apache.org Delivered-To: apmail-parquet-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C979B17D9A for ; Mon, 29 Sep 2014 19:01:08 +0000 (UTC) Received: (qmail 38616 invoked by uid 500); 29 Sep 2014 19:01:08 -0000 Delivered-To: apmail-parquet-commits-archive@parquet.apache.org Received: (qmail 38594 invoked by uid 500); 29 Sep 2014 19:01:08 -0000 Mailing-List: contact commits-help@parquet.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.incubator.apache.org Delivered-To: mailing list commits@parquet.incubator.apache.org Received: (qmail 38585 invoked by uid 99); 29 Sep 2014 19:01:08 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2014 19:01:08 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 29 Sep 2014 19:01:07 +0000 Received: (qmail 38233 invoked by uid 99); 29 Sep 2014 19:00:46 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2014 19:00:46 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 779509C2C45; Mon, 29 Sep 2014 19:00:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: julien@apache.org To: commits@parquet.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: PARQUET-104: Fix writing empty row group at the end of the file Date: Mon, 29 Sep 2014 19:00:46 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-parquet-mr Updated Branches: refs/heads/master bf20abbf4 -> 0b17cbee9 PARQUET-104: Fix writing empty row group at the end of the file At then end of a parquet file, it may writes an empty rowgroup. This happens when: numberOfRecords mod sizeOfRowGroup = 0 Author: Tianshuo Deng Closes #66 from tsdeng/fix_empty_row_group and squashes the following commits: 10b93fb [Tianshuo Deng] rename e3a5896 [Tianshuo Deng] format 91fa0d4 [Tianshuo Deng] fix empty row group Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/0b17cbee Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/0b17cbee Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/0b17cbee Branch: refs/heads/master Commit: 0b17cbee9541998df66d33c8a99b675ced80d9aa Parents: bf20abb Author: Tianshuo Deng Authored: Mon Sep 29 12:00:03 2014 -0700 Committer: julien Committed: Mon Sep 29 12:00:03 2014 -0700 ---------------------------------------------------------------------- .../hadoop/InternalParquetRecordWriter.java | 64 +++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/0b17cbee/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java index 727d21d..d73c811 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java @@ -42,11 +42,11 @@ class InternalParquetRecordWriter { private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; - private final ParquetFileWriter w; + private final ParquetFileWriter parquetFileWriter; private final WriteSupport writeSupport; private final MessageType schema; private final Map extraMetaData; - private final int blockSize; + private final int rowGroupSize; private final int pageSize; private final BytesCompressor compressor; private final int dictionaryPageSize; @@ -57,34 +57,34 @@ class InternalParquetRecordWriter { private long recordCount = 0; private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; - private ColumnWriteStoreImpl store; + private ColumnWriteStoreImpl columnStore; private ColumnChunkPageWriteStore pageStore; /** - * @param w the file to write to + * @param parquetFileWriter the file to write to * @param writeSupport the class to convert incoming records * @param schema the schema of the records * @param extraMetaData extra meta data to write in the footer of the file - * @param blockSize the size of a block in the file (this will be approximate) - * @param codec the codec used to compress + * @param rowGroupSize the size of a block in the file (this will be approximate) + * @param compressor the codec used to compress */ public InternalParquetRecordWriter( - ParquetFileWriter w, + ParquetFileWriter parquetFileWriter, WriteSupport writeSupport, MessageType schema, Map extraMetaData, - int blockSize, + int rowGroupSize, int pageSize, BytesCompressor compressor, int dictionaryPageSize, boolean enableDictionary, boolean validating, WriterVersion writerVersion) { - this.w = w; + this.parquetFileWriter = parquetFileWriter; this.writeSupport = checkNotNull(writeSupport, "writeSupport"); this.schema = schema; this.extraMetaData = extraMetaData; - this.blockSize = blockSize; + this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; this.compressor = compressor; this.dictionaryPageSize = dictionaryPageSize; @@ -98,22 +98,22 @@ class InternalParquetRecordWriter { // we don't want this number to be too small // ideally we divide the block equally across the columns // it is unlikely all columns are going to be the same size. - int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5); + int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size() / 5); pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize); // we don't want this number to be too small either // ideally, slightly bigger than the page size, but not bigger than the block buffer int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); - store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); + columnStore = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); - writeSupport.prepareForWrite(columnIO.getRecordWriter(store)); + writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore)); } public void close() throws IOException, InterruptedException { - flushStore(); + flushRowGroupToStore(); FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite(); Map finalMetadata = new HashMap(extraMetaData); finalMetadata.putAll(finalWriteContext.getExtraMetaData()); - w.end(finalMetadata); + parquetFileWriter.end(finalMetadata); } public void write(T value) throws IOException, InterruptedException { @@ -124,16 +124,16 @@ class InternalParquetRecordWriter { private void checkBlockSizeReached() throws IOException { if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. - long memSize = store.memSize(); - if (memSize > blockSize) { - LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount)); - flushStore(); + long memSize = columnStore.memSize(); + if (memSize > rowGroupSize) { + LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, rowGroupSize, recordCount)); + flushRowGroupToStore(); initStore(); recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); } else { float recordSize = (float) memSize / recordCount; recordCountForNextMemCheck = min( - max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway + max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSize / recordSize)) / 2), // will check halfway recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead ); if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck)); @@ -141,18 +141,22 @@ class InternalParquetRecordWriter { } } - private void flushStore() + private void flushRowGroupToStore() throws IOException { - LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); - if (store.allocatedSize() > 3 * (long) blockSize) { - LOG.warn("Too much memory used: " + store.memUsageString()); + LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.allocatedSize())); + if (columnStore.allocatedSize() > 3 * (long)rowGroupSize) { + LOG.warn("Too much memory used: " + columnStore.memUsageString()); } - w.startBlock(recordCount); - store.flush(); - pageStore.flushToFileWriter(w); - recordCount = 0; - w.endBlock(); - store = null; + + if (recordCount > 0) { + parquetFileWriter.startBlock(recordCount); + columnStore.flush(); + pageStore.flushToFileWriter(parquetFileWriter); + recordCount = 0; + parquetFileWriter.endBlock(); + } + + columnStore = null; pageStore = null; } } \ No newline at end of file