Repository: incubator-parquet-mr
Updated Branches:
refs/heads/master bf20abbf4 -> 0b17cbee9
PARQUET-104: Fix writing empty row group at the end of the file
At then end of a parquet file, it may writes an empty rowgroup.
This happens when: numberOfRecords mod sizeOfRowGroup = 0
Author: Tianshuo Deng <tdeng@twitter.com>
Closes #66 from tsdeng/fix_empty_row_group and squashes the following commits:
10b93fb [Tianshuo Deng] rename
e3a5896 [Tianshuo Deng] format
91fa0d4 [Tianshuo Deng] fix empty row group
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/0b17cbee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/0b17cbee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/0b17cbee
Branch: refs/heads/master
Commit: 0b17cbee9541998df66d33c8a99b675ced80d9aa
Parents: bf20abb
Author: Tianshuo Deng <tdeng@twitter.com>
Authored: Mon Sep 29 12:00:03 2014 -0700
Committer: julien <julien@twitter.com>
Committed: Mon Sep 29 12:00:03 2014 -0700
----------------------------------------------------------------------
.../hadoop/InternalParquetRecordWriter.java | 64 +++++++++++---------
1 file changed, 34 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/0b17cbee/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
index 727d21d..d73c811 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
@@ -42,11 +42,11 @@ class InternalParquetRecordWriter<T> {
private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
- private final ParquetFileWriter w;
+ private final ParquetFileWriter parquetFileWriter;
private final WriteSupport<T> writeSupport;
private final MessageType schema;
private final Map<String, String> extraMetaData;
- private final int blockSize;
+ private final int rowGroupSize;
private final int pageSize;
private final BytesCompressor compressor;
private final int dictionaryPageSize;
@@ -57,34 +57,34 @@ class InternalParquetRecordWriter<T> {
private long recordCount = 0;
private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
- private ColumnWriteStoreImpl store;
+ private ColumnWriteStoreImpl columnStore;
private ColumnChunkPageWriteStore pageStore;
/**
- * @param w the file to write to
+ * @param parquetFileWriter the file to write to
* @param writeSupport the class to convert incoming records
* @param schema the schema of the records
* @param extraMetaData extra meta data to write in the footer of the file
- * @param blockSize the size of a block in the file (this will be approximate)
- * @param codec the codec used to compress
+ * @param rowGroupSize the size of a block in the file (this will be approximate)
+ * @param compressor the codec used to compress
*/
public InternalParquetRecordWriter(
- ParquetFileWriter w,
+ ParquetFileWriter parquetFileWriter,
WriteSupport<T> writeSupport,
MessageType schema,
Map<String, String> extraMetaData,
- int blockSize,
+ int rowGroupSize,
int pageSize,
BytesCompressor compressor,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
WriterVersion writerVersion) {
- this.w = w;
+ this.parquetFileWriter = parquetFileWriter;
this.writeSupport = checkNotNull(writeSupport, "writeSupport");
this.schema = schema;
this.extraMetaData = extraMetaData;
- this.blockSize = blockSize;
+ this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
this.compressor = compressor;
this.dictionaryPageSize = dictionaryPageSize;
@@ -98,22 +98,22 @@ class InternalParquetRecordWriter<T> {
// we don't want this number to be too small
// ideally we divide the block equally across the columns
// it is unlikely all columns are going to be the same size.
- int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size()
/ 5);
+ int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size()
/ 5);
pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize);
// we don't want this number to be too small either
// ideally, slightly bigger than the page size, but not bigger than the block buffer
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
- store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize,
enableDictionary, writerVersion);
+ columnStore = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize,
enableDictionary, writerVersion);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
- writeSupport.prepareForWrite(columnIO.getRecordWriter(store));
+ writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
}
public void close() throws IOException, InterruptedException {
- flushStore();
+ flushRowGroupToStore();
FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite();
Map<String, String> finalMetadata = new HashMap<String, String>(extraMetaData);
finalMetadata.putAll(finalWriteContext.getExtraMetaData());
- w.end(finalMetadata);
+ parquetFileWriter.end(finalMetadata);
}
public void write(T value) throws IOException, InterruptedException {
@@ -124,16 +124,16 @@ class InternalParquetRecordWriter<T> {
private void checkBlockSizeReached() throws IOException {
if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively
expensive, so let's not do it for every record.
- long memSize = store.memSize();
- if (memSize > blockSize) {
- LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize,
blockSize, recordCount));
- flushStore();
+ long memSize = columnStore.memSize();
+ if (memSize > rowGroupSize) {
+ LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize,
rowGroupSize, recordCount));
+ flushRowGroupToStore();
initStore();
recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount
/ 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
} else {
float recordSize = (float) memSize / recordCount;
recordCountForNextMemCheck = min(
- max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize))
/ 2), // will check halfway
+ max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSize / recordSize))
/ 2), // will check halfway
recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records
ahead
);
if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount,
recordCountForNextMemCheck));
@@ -141,18 +141,22 @@ class InternalParquetRecordWriter<T> {
}
}
- private void flushStore()
+ private void flushRowGroupToStore()
throws IOException {
- LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
- if (store.allocatedSize() > 3 * (long) blockSize) {
- LOG.warn("Too much memory used: " + store.memUsageString());
+ LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.allocatedSize()));
+ if (columnStore.allocatedSize() > 3 * (long)rowGroupSize) {
+ LOG.warn("Too much memory used: " + columnStore.memUsageString());
}
- w.startBlock(recordCount);
- store.flush();
- pageStore.flushToFileWriter(w);
- recordCount = 0;
- w.endBlock();
- store = null;
+
+ if (recordCount > 0) {
+ parquetFileWriter.startBlock(recordCount);
+ columnStore.flush();
+ pageStore.flushToFileWriter(parquetFileWriter);
+ recordCount = 0;
+ parquetFileWriter.endBlock();
+ }
+
+ columnStore = null;
pageStore = null;
}
}
\ No newline at end of file
|