parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject incubator-parquet-mr git commit: PARQUET-108: Parquet Memory Management in Java
Date Mon, 29 Dec 2014 15:17:43 GMT
Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 8e2ea92ee -> 23db4eb88


PARQUET-108: Parquet Memory Management in Java

PARQUET-108: Parquet Memory Management in Java.
When Parquet tries to write very large "row groups", it may causes tasks to run out of memory
during dynamic partitions when a reducer may have many Parquet files open at a given time.

This patch implements a memory manager to control the total memory size used by writers and
balance their memory usage, which ensures that we don't run out of memory due to writing too
many row groups within a single JVM.

Author: dongche1 <dong1.chen@intel.com>

Closes #80 from dongche/master and squashes the following commits:

e511f85 [dongche1] Merge remote branch 'upstream/master'
60a96b5 [dongche1] Merge remote branch 'upstream/master'
2d17212 [dongche1] improve MemoryManger instantiation, change access level
6e9333e [dongche1] change blocksize type from int to long
e07b16e [dongche1] Refine updateAllocation(), addWriter(). Remove redundant getMemoryPoolRatio
9a0a831 [dongche1] log the inconsistent ratio config instead of thowing an exception
3a35d22 [dongche1] Move the creation of MemoryManager. Throw exception instead of logging
it
aeda7bc [dongche1] PARQUET-108: Parquet Memory Management in Java" ;
c883bba [dongche1] PARQUET-108: Parquet Memory Management in Java
7b45b2c [dongche1] PARQUET-108: Parquet Memory Management in Java
6d766aa [dongche1] PARQUET-108: Parquet Memory Management in Java --- address some comments
3abfe2b [dongche1] parquet 108


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/23db4eb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/23db4eb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/23db4eb8

Branch: refs/heads/master
Commit: 23db4eb88aa018da25563586bab322e7c1867ad5
Parents: 8e2ea92
Author: dongche1 <dong1.chen@intel.com>
Authored: Mon Dec 29 09:17:34 2014 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Mon Dec 29 09:17:34 2014 -0600

----------------------------------------------------------------------
 .../hadoop/InternalParquetRecordWriter.java     |  28 +++-
 .../main/java/parquet/hadoop/MemoryManager.java | 137 +++++++++++++++++++
 .../java/parquet/hadoop/ParquetInputSplit.java  |  23 +++-
 .../parquet/hadoop/ParquetOutputFormat.java     |  30 +++-
 .../parquet/hadoop/ParquetRecordWriter.java     |  41 +++++-
 .../java/parquet/hadoop/TestInputFormat.java    |  38 +++++
 .../java/parquet/hadoop/TestMemoryManager.java  | 103 ++++++++++++++
 7 files changed, 385 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/23db4eb8/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
index 79ef5ac..5026bbe 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import parquet.Ints;
 import parquet.Log;
 import parquet.column.ColumnWriteStore;
 import parquet.column.ParquetProperties;
@@ -49,7 +50,8 @@ class InternalParquetRecordWriter<T> {
   private final WriteSupport<T> writeSupport;
   private final MessageType schema;
   private final Map<String, String> extraMetaData;
-  private final int rowGroupSize;
+  private final long rowGroupSize;
+  private long rowGroupSizeThreshold;
   private final int pageSize;
   private final BytesCompressor compressor;
   private final boolean validating;
@@ -75,7 +77,7 @@ class InternalParquetRecordWriter<T> {
       WriteSupport<T> writeSupport,
       MessageType schema,
       Map<String, String> extraMetaData,
-      int rowGroupSize,
+      long rowGroupSize,
       int pageSize,
       BytesCompressor compressor,
       int dictionaryPageSize,
@@ -87,6 +89,7 @@ class InternalParquetRecordWriter<T> {
     this.schema = schema;
     this.extraMetaData = extraMetaData;
     this.rowGroupSize = rowGroupSize;
+    this.rowGroupSizeThreshold = rowGroupSize;
     this.pageSize = pageSize;
     this.compressor = compressor;
     this.validating = validating;
@@ -98,7 +101,10 @@ class InternalParquetRecordWriter<T> {
     // we don't want this number to be too small
     // ideally we divide the block equally across the columns
     // it is unlikely all columns are going to be the same size.
-    int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size()
/ 5);
+    // its value is likely below Integer.MAX_VALUE (2GB), although rowGroupSize is a long
type.
+    // therefore this size is cast to int, since allocating byte array in under layer needs
to
+    // limit the array size in an int scope.
+    int initialBlockBufferSize = Ints.checkedCast(max(MINIMUM_BUFFER_SIZE, rowGroupSize /
schema.getColumns().size() / 5));
     pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize);
     // we don't want this number to be too small either
     // ideally, slightly bigger than the page size, but not bigger than the block buffer
@@ -129,15 +135,15 @@ class InternalParquetRecordWriter<T> {
   private void checkBlockSizeReached() throws IOException {
     if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively
expensive, so let's not do it for every record.
       long memSize = columnStore.getBufferedSize();
-      if (memSize > rowGroupSize) {
-        LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize,
rowGroupSize, recordCount));
+      if (memSize > rowGroupSizeThreshold) {
+        LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize,
rowGroupSizeThreshold, recordCount));
         flushRowGroupToStore();
         initStore();
         recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount
/ 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
       } else {
         float recordSize = (float) memSize / recordCount;
         recordCountForNextMemCheck = min(
-            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSize / recordSize))
/ 2), // will check halfway
+            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSizeThreshold
/ recordSize)) / 2), // will check halfway
             recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records
ahead
             );
         if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount,
recordCountForNextMemCheck));
@@ -148,7 +154,7 @@ class InternalParquetRecordWriter<T> {
   private void flushRowGroupToStore()
       throws IOException {
     LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
-    if (columnStore.getAllocatedSize() > 3 * (long)rowGroupSize) {
+    if (columnStore.getAllocatedSize() > 3 * (long)rowGroupSizeThreshold) {
       LOG.warn("Too much memory used: " + columnStore.memUsageString());
     }
 
@@ -163,4 +169,12 @@ class InternalParquetRecordWriter<T> {
     columnStore = null;
     pageStore = null;
   }
+
+  long getRowGroupSizeThreshold() {
+    return rowGroupSizeThreshold;
+  }
+
+  void setRowGroupSizeThreshold(long rowGroupSizeThreshold) {
+    this.rowGroupSizeThreshold = rowGroupSizeThreshold;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/23db4eb8/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java b/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
new file mode 100644
index 0000000..f04822f
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
@@ -0,0 +1,137 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.hadoop;
+
+import parquet.Log;
+
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements a memory manager that keeps a global context of how many Parquet
+ * writers there are and manages the memory between them. For use cases with
+ * dynamic partitions, it is easy to end up with many writers in the same task.
+ * By managing the size of each allocation, we try to cut down the size of each
+ * allocation and keep the task from running out of memory.
+ *
+ * This class balances the allocation size of each writer by resize them averagely.
+ * When the sum of each writer's allocation size  is less than total memory pool,
+ * keep them original value.
+ * When the sum exceeds, decrease each writer's allocation size by a ratio.
+ */
+public class MemoryManager {
+  private static final Log LOG = Log.getLog(MemoryManager.class);
+  static final float DEFAULT_MEMORY_POOL_RATIO = 0.95f;
+  private final float memoryPoolRatio;
+
+  private final long totalMemoryPool;
+  private final Map<InternalParquetRecordWriter, Long> writerList = new
+      HashMap<InternalParquetRecordWriter, Long>();
+
+  public MemoryManager(float ratio) {
+    checkRatio(ratio);
+
+    memoryPoolRatio = ratio;
+    totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax
+        () * ratio);
+    LOG.debug(String.format("Allocated total memory pool is: %,d", totalMemoryPool));
+  }
+
+  private void checkRatio(float ratio) {
+    if (ratio <= 0 || ratio > 1) {
+      throw new IllegalArgumentException("The configured memory pool ratio " + ratio + "
is " +
+          "not between 0 and 1.");
+    }
+  }
+
+  /**
+   * Add a new writer and its memory allocation to the memory manager.
+   * @param writer the new created writer
+   * @param allocation the requested buffer size
+   */
+  synchronized void addWriter(InternalParquetRecordWriter writer, Long allocation) {
+    Long oldValue = writerList.get(writer);
+    if (oldValue == null) {
+      writerList.put(writer, allocation);
+    } else {
+      throw new IllegalArgumentException("[BUG] The Parquet Memory Manager should not add
an " +
+          "instance of InternalParquetRecordWriter more than once. The Manager already contains
" +
+          "the writer: " + writer);
+    }
+    updateAllocation();
+  }
+
+  /**
+   * Remove the given writer from the memory manager.
+   * @param writer the writer that has been closed
+   */
+  synchronized void removeWriter(InternalParquetRecordWriter writer) {
+    if (writerList.containsKey(writer)) {
+      writerList.remove(writer);
+    }
+    if (!writerList.isEmpty()) {
+      updateAllocation();
+    }
+  }
+
+  /**
+   * Update the allocated size of each writer based on the current allocations and pool size.
+   */
+  private void updateAllocation() {
+    long totalAllocations = 0;
+    double scale;
+    for (Long allocation : writerList.values()) {
+      totalAllocations += allocation;
+    }
+    if (totalAllocations <= totalMemoryPool) {
+      scale = 1.0;
+    } else {
+      scale = (double) totalMemoryPool / totalAllocations;
+    }
+
+    for (Map.Entry<InternalParquetRecordWriter, Long> entry : writerList.entrySet())
{
+      long newSize = (long) Math.floor(entry.getValue() * scale);
+      entry.getKey().setRowGroupSizeThreshold(newSize);
+      LOG.debug(String.format("Adjust block size from %,d to %,d for writer: %s",
+            entry.getValue(), newSize, entry.getKey()));
+    }
+  }
+
+  /**
+   * Get the total memory pool size that is available for writers.
+   * @return the number of bytes in the memory pool
+   */
+  long getTotalMemoryPool() {
+    return totalMemoryPool;
+  }
+
+  /**
+   * Get the writers list
+   * @return the writers in this memory manager
+   */
+  Map<InternalParquetRecordWriter, Long> getWriterList() {
+    return writerList;
+  }
+
+  /**
+   * Get the ratio of memory allocated for all the writers.
+   * @return the memory pool ratio
+   */
+  float getMemoryPoolRatio() {
+    return memoryPoolRatio;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/23db4eb8/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
index 133d5be..a8c2488 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
@@ -37,6 +37,9 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 import parquet.bytes.BytesUtils;
 import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
 
 /**
  * An input split for the Parquet format
@@ -88,21 +91,31 @@ public class ParquetInputSplit extends FileSplit implements Writable {
       Map<String, String> extraMetadata,
       Map<String, String> readSupportMetadata) {
     this(
-        path, start, length, end(blocks), hosts,
+        path, start, length, end(blocks, requestedSchema), hosts,
         offsets(blocks),
         requestedSchema, readSupportMetadata
         );
   }
 
-  private static long end(List<BlockMetaData> blocks) {
-    BlockMetaData last = blocks.get(blocks.size() - 1);
-    return last.getStartingPos() + last.getCompressedSize();
+  private static long end(List<BlockMetaData> blocks, String requestedSchema) {
+    MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
+    long length = 0;
+
+    for (BlockMetaData block : blocks) {
+      List<ColumnChunkMetaData> columns = block.getColumns();
+      for (ColumnChunkMetaData column : columns) {
+        if (requested.containsPath(column.getPath().toArray())) {
+          length += column.getTotalSize();
+        }
+      }
+    }
+    return length;
   }
 
   private static long[] offsets(List<BlockMetaData> blocks) {
     long[] offsets = new long[blocks.size()];
     for (int i = 0; i < offsets.length; i++) {
-      offsets[i] = blocks.get(0).getStartingPos();
+      offsets[i] = blocks.get(i).getStartingPos();
     }
     return offsets;
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/23db4eb8/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
index 98e73e0..f56e065 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
@@ -104,6 +104,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
   public static final String VALIDATION           = "parquet.validation";
   public static final String WRITER_VERSION       = "parquet.writer.version";
   public static final String ENABLE_JOB_SUMMARY   = "parquet.enable.summary-metadata";
+  public static final String MEMORY_POOL_RATIO    = "parquet.memory.pool.ratio";
 
   public static void setWriteSupportClass(Job job,  Class<?> writeSupportClass) {
     getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
@@ -178,10 +179,15 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
     return configuration.getBoolean(ENABLE_DICTIONARY, true);
   }
 
+  @Deprecated
   public static int getBlockSize(Configuration configuration) {
     return configuration.getInt(BLOCK_SIZE, DEFAULT_BLOCK_SIZE);
   }
 
+  public static long getLongBlockSize(Configuration configuration) {
+    return configuration.getLong(BLOCK_SIZE, DEFAULT_BLOCK_SIZE);
+  }
+
   public static int getPageSize(Configuration configuration) {
     return configuration.getInt(PAGE_SIZE, DEFAULT_PAGE_SIZE);
   }
@@ -262,7 +268,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
     final WriteSupport<T> writeSupport = getWriteSupport(conf);
 
     CodecFactory codecFactory = new CodecFactory(conf);
-    int blockSize = getBlockSize(conf);
+    long blockSize = getLongBlockSize(conf);
     if (INFO) LOG.info("Parquet block size to " + blockSize);
     int pageSize = getPageSize(conf);
     if (INFO) LOG.info("Parquet page size to " + pageSize);
@@ -279,6 +285,15 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
     ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);
     w.start();
 
+    float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
+        MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
+    if (memoryManager == null) {
+      memoryManager = new MemoryManager(maxLoad);
+    } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {
+      LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not "
+
+          "be reset by the new value: " + maxLoad);
+    }
+
     return new ParquetRecordWriter<T>(
         w,
         writeSupport,
@@ -289,7 +304,8 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
         dictionaryPageSize,
         enableDictionary,
         validating,
-        writerVersion);
+        writerVersion,
+        memoryManager);
   }
 
   /**
@@ -318,4 +334,14 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
     }
     return committer;
   }
+
+
+  /**
+   * This memory manager is for all the real writers (InternalParquetRecordWriter) in one
task.
+   */
+  private static MemoryManager memoryManager;
+
+  static MemoryManager getMemoryManager() {
+    return memoryManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/23db4eb8/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java
index fac67fe..53fe3ac 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java
@@ -25,6 +25,8 @@ import parquet.hadoop.CodecFactory.BytesCompressor;
 import parquet.hadoop.api.WriteSupport;
 import parquet.schema.MessageType;
 
+import static parquet.Preconditions.checkNotNull;
+
 /**
  * Writes records to a Parquet file
  *
@@ -37,6 +39,7 @@ import parquet.schema.MessageType;
 public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
 
   private InternalParquetRecordWriter<T> internalWriter;
+  private MemoryManager memoryManager;
 
   /**
    *
@@ -50,6 +53,7 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void,
T> {
    * @param enableDictionary to enable the dictionary
    * @param validating if schema validation should be turned on
    */
+  @Deprecated
   public ParquetRecordWriter(
       ParquetFileWriter w,
       WriteSupport<T> writeSupport,
@@ -62,7 +66,39 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void,
T> {
       boolean validating,
       WriterVersion writerVersion) {
     internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
-        extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
validating, writerVersion);
+        extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
+        validating, writerVersion);
+  }
+
+  /**
+   *
+   * @param w the file to write to
+   * @param writeSupport the class to convert incoming records
+   * @param schema the schema of the records
+   * @param extraMetaData extra meta data to write in the footer of the file
+   * @param blockSize the size of a block in the file (this will be approximate)
+   * @param compressor the compressor used to compress the pages
+   * @param dictionaryPageSize the threshold for dictionary size
+   * @param enableDictionary to enable the dictionary
+   * @param validating if schema validation should be turned on
+   */
+  public ParquetRecordWriter(
+      ParquetFileWriter w,
+      WriteSupport<T> writeSupport,
+      MessageType schema,
+      Map<String, String> extraMetaData,
+      long blockSize, int pageSize,
+      BytesCompressor compressor,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating,
+      WriterVersion writerVersion,
+      MemoryManager memoryManager) {
+    internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
+        extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
+        validating, writerVersion);
+    this.memoryManager = checkNotNull(memoryManager, "memoryManager");
+    memoryManager.addWriter(internalWriter, blockSize);
   }
 
   /**
@@ -71,6 +107,9 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void,
T> {
   @Override
   public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
     internalWriter.close();
+    if (memoryManager != null) {
+      memoryManager.removeWriter(internalWriter);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/23db4eb8/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
index bac4fb0..98fd27e 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
@@ -516,6 +516,18 @@ public class TestInputFormat {
     assertFalse(cacheValue.isNewerThan(newerCacheValue));
   }
 
+  @Test
+  public void testDeprecatedConstructorOfParquetInputSplit() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByDeprecatedConstructor(50, 50);
+
+    shouldSplitBlockSizeBe(splits, 5, 5);
+    shouldOneSplitRowGroupOffsetBe(splits.get(0), 0, 10, 20, 30, 40);
+    shouldOneSplitRowGroupOffsetBe(splits.get(1), 50, 60, 70, 80, 90);
+    shouldSplitLengthBe(splits, 50, 50);
+    shouldSplitStartBe(splits, 0, 50);
+  }
+
   private File getTempFile() throws IOException {
     File tempFile = File.createTempFile("footer_", ".txt");
     tempFile.deleteOnExit();
@@ -558,6 +570,25 @@ public class TestInputFormat {
         min, max);
   }
 
+  private List<ParquetInputSplit> generateSplitByDeprecatedConstructor(long min, long
max) throws
+      IOException {
+    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
+    List<ClientSideMetadataSplitStrategy.SplitInfo> splitInfos = ClientSideMetadataSplitStrategy
+        .generateSplitInfo(blocks, hdfsBlocks, min, max);
+
+    for (ClientSideMetadataSplitStrategy.SplitInfo splitInfo : splitInfos) {
+      BlockMetaData lastRowGroup = splitInfo.getRowGroups().get(splitInfo.getRowGroupCount()
- 1);
+      long end = lastRowGroup.getStartingPos() + lastRowGroup.getTotalByteSize();
+
+      ParquetInputSplit split = new ParquetInputSplit(fileStatus.getPath(),
+          splitInfo.hdfsBlock.getOffset(), end, splitInfo.hdfsBlock.getHosts(),
+          splitInfo.rowGroups, schema.toString(), null, null, extramd);
+      splits.add(split);
+    }
+
+    return splits;
+  }
+
   private void shouldSplitStartBe(List<ParquetInputSplit> splits, long... offsets)
{
     assertEquals(message(splits), offsets.length, splits.size());
     for (int i = 0; i < offsets.length; i++) {
@@ -582,6 +613,13 @@ public class TestInputFormat {
     }
   }
 
+  private void shouldOneSplitRowGroupOffsetBe(ParquetInputSplit split, int... rowGroupOffsets)
{
+    assertEquals(split.toString(), rowGroupOffsets.length, split.getRowGroupOffsets().length);
+    for (int i = 0; i < rowGroupOffsets.length; i++) {
+      assertEquals(split.toString(), rowGroupOffsets[i], split.getRowGroupOffsets()[i]);
+    }
+  }
+
   private String message(List<ParquetInputSplit> splits) {
     return String.valueOf(splits) + " " + Arrays.toString(hdfsBlocks) + "\n";
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/23db4eb8/parquet-hadoop/src/test/java/parquet/hadoop/TestMemoryManager.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestMemoryManager.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestMemoryManager.java
new file mode 100644
index 0000000..da06cef
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestMemoryManager.java
@@ -0,0 +1,103 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.hadoop;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import parquet.hadoop.example.GroupWriteSupport;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.schema.MessageTypeParser;
+
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.util.Set;
+
+/**
+ * Verify MemoryManager could adjust its writers' allocated memory size.
+ */
+public class TestMemoryManager {
+
+  Configuration conf = new Configuration();
+  String writeSchema = "message example {\n" +
+      "required int32 line;\n" +
+      "required binary content;\n" +
+      "}";
+  long expectPoolSize;
+  int rowGroupSize;
+  ParquetOutputFormat parquetOutputFormat;
+  CompressionCodecName codec;
+
+  @Before
+  public void setUp() {
+    GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema),conf);
+    expectPoolSize = Math.round(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax
+        () * MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
+    rowGroupSize = (int) Math.floor(expectPoolSize / 2);
+    conf.setInt(ParquetOutputFormat.BLOCK_SIZE, rowGroupSize);
+    codec = CompressionCodecName.UNCOMPRESSED;
+  }
+
+  @After
+  public void tearDown() throws Exception{
+    FileUtils.deleteDirectory(new File("target/test"));
+  }
+
+  @Test
+  public void testMemoryManager() throws Exception {
+    //Verify the adjusted rowGroupSize of writers
+    RecordWriter writer1 = createWriter(1);
+    verifyRowGroupSize(rowGroupSize);
+
+    RecordWriter writer2 = createWriter(2);
+    verifyRowGroupSize(rowGroupSize);
+
+    RecordWriter writer3 = createWriter(3);
+    verifyRowGroupSize((int) Math.floor(expectPoolSize / 3));
+
+    writer1.close(null);
+    verifyRowGroupSize(rowGroupSize);
+
+    writer2.close(null);
+    verifyRowGroupSize(rowGroupSize);
+
+    writer3.close(null);
+
+    //Verify the memory pool
+    Assert.assertEquals("memory pool size is incorrect.", expectPoolSize,
+        parquetOutputFormat.getMemoryManager().getTotalMemoryPool());
+  }
+
+  private RecordWriter createWriter(int index) throws Exception{
+    Path file = new Path("target/test/", "parquet" + index);
+    parquetOutputFormat = new ParquetOutputFormat(new GroupWriteSupport());
+    return parquetOutputFormat.getRecordWriter(conf, file, codec);
+  }
+
+  private void verifyRowGroupSize(int expectRowGroupSize) {
+    Set<InternalParquetRecordWriter> writers = parquetOutputFormat.getMemoryManager()
+        .getWriterList().keySet();
+    for (InternalParquetRecordWriter writer : writers) {
+      Assert.assertEquals("wrong rowGroupSize", expectRowGroupSize,
+          writer.getRowGroupSizeThreshold(), 1);
+    }
+  }
+}


Mime
View raw message