parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-373: Fix flaky MemoryManager tests.
Date Mon, 19 Oct 2015 22:51:17 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master b1ea059a6 -> 5294c64b3


PARQUET-373: Fix flaky MemoryManager tests.

Author: Ryan Blue <blue@apache.org>

Closes #269 from rdblue/PARQUET-373-fix-flaky-mem-manager-tests and squashes the following
commits:

1b55889 [Ryan Blue] PARQUET-373: Fix flaky MemoryManager tests.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/5294c64b
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/5294c64b
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/5294c64b

Branch: refs/heads/master
Commit: 5294c64b342818e021800b38413f36f426e35b3c
Parents: b1ea059
Author: Ryan Blue <blue@apache.org>
Authored: Mon Oct 19 15:51:07 2015 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Mon Oct 19 15:51:07 2015 -0700

----------------------------------------------------------------------
 .../parquet/hadoop/TestMemoryManager.java       | 167 +++++++++++++------
 1 file changed, 114 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/5294c64b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
index fb71b86..b91fedd 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
@@ -18,14 +18,14 @@
  */
 package org.apache.parquet.hadoop;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.apache.parquet.hadoop.example.GroupWriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.MessageTypeParser;
@@ -44,89 +44,150 @@ public class TestMemoryManager {
       "required int32 line;\n" +
       "required binary content;\n" +
       "}";
-  long expectPoolSize;
-  int rowGroupSize;
+  long expectedPoolSize;
   ParquetOutputFormat parquetOutputFormat;
-  CompressionCodecName codec;
   int counter = 0;
-  boolean firstRegister = true;
 
   @Before
-  public void setUp() {
-    GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema),conf);
-    expectPoolSize = Math.round((double) ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax
-        () * MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
-    rowGroupSize = (int) Math.floor(expectPoolSize / 2);
-    conf.setInt(ParquetOutputFormat.BLOCK_SIZE, rowGroupSize);
-    codec = CompressionCodecName.UNCOMPRESSED;
+  public void setUp() throws Exception {
+    parquetOutputFormat = new ParquetOutputFormat(new GroupWriteSupport());
+
+    GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema), conf);
+    expectedPoolSize = Math.round((double)
+        ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() *
+        MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
+
+    long rowGroupSize = expectedPoolSize / 2;
+    conf.setLong(ParquetOutputFormat.BLOCK_SIZE, rowGroupSize);
+
+    // the memory manager is not initialized until a writer is created
+    createWriter(1).close(null);
   }
 
-  @After
-  public void tearDown() throws Exception{
-    FileUtils.deleteDirectory(new File("target/test"));
+  @Test
+  public void testMemoryManagerUpperLimit() {
+    // Verify the memory pool size
+    // this value tends to change a little between setup and tests, so this
+    // validates that it is within 5% of the expected value
+    long poolSize = ParquetOutputFormat.getMemoryManager().getTotalMemoryPool();
+    Assert.assertTrue("Pool size should be within 5% of the expected value",
+        Math.abs(expectedPoolSize - poolSize) < (long) (expectedPoolSize * 0.05));
   }
 
   @Test
   public void testMemoryManager() throws Exception {
-    //Verify the adjusted rowGroupSize of writers
+    long poolSize = ParquetOutputFormat.getMemoryManager().getTotalMemoryPool();
+    long rowGroupSize = poolSize / 2;
+    conf.setLong(ParquetOutputFormat.BLOCK_SIZE, rowGroupSize);
+
+    Assert.assertTrue("Pool should hold 2 full row groups",
+        (2 * rowGroupSize) <= poolSize);
+    Assert.assertTrue("Pool should not hold 3 full row groups",
+        poolSize < (3 * rowGroupSize));
+
+    Assert.assertEquals("Allocations should start out at 0",
+        0, getTotalAllocation());
+
     RecordWriter writer1 = createWriter(1);
-    verifyRowGroupSize(rowGroupSize);
+    Assert.assertTrue("Allocations should never exceed pool size",
+        getTotalAllocation() <= poolSize);
+    Assert.assertEquals("First writer should be limited by row group size",
+        rowGroupSize, getTotalAllocation());
 
     RecordWriter writer2 = createWriter(2);
-    verifyRowGroupSize(rowGroupSize);
+    Assert.assertTrue("Allocations should never exceed pool size",
+        getTotalAllocation() <= poolSize);
+    Assert.assertEquals("Second writer should be limited by row group size",
+        2 * rowGroupSize, getTotalAllocation());
 
     RecordWriter writer3 = createWriter(3);
-    verifyRowGroupSize((int) Math.floor(expectPoolSize / 3));
+    Assert.assertTrue("Allocations should never exceed pool size",
+        getTotalAllocation() <= poolSize);
 
     writer1.close(null);
-    verifyRowGroupSize(rowGroupSize);
+    Assert.assertTrue("Allocations should never exceed pool size",
+        getTotalAllocation() <= poolSize);
+    Assert.assertEquals("Allocations should be increased to the row group size",
+        2 * rowGroupSize, getTotalAllocation());
 
     writer2.close(null);
-    verifyRowGroupSize(rowGroupSize);
+    Assert.assertTrue("Allocations should never exceed pool size",
+        getTotalAllocation() <= poolSize);
+    Assert.assertEquals("Allocations should be increased to the row group size",
+        rowGroupSize, getTotalAllocation());
 
     writer3.close(null);
+    Assert.assertEquals("Allocations should be increased to the row group size",
+        0, getTotalAllocation());
+  }
 
-    //Verify the memory pool
-    Assert.assertEquals("memory pool size is incorrect.", expectPoolSize,
-        parquetOutputFormat.getMemoryManager().getTotalMemoryPool());
+  @Test
+  public void testReallocationCallback() throws Exception {
+    // validate assumptions
+    long poolSize = ParquetOutputFormat.getMemoryManager().getTotalMemoryPool();
+    long rowGroupSize = poolSize / 2;
+    conf.setLong(ParquetOutputFormat.BLOCK_SIZE, rowGroupSize);
+
+    Assert.assertTrue("Pool should hold 2 full row groups",
+        (2 * rowGroupSize) <= poolSize);
+    Assert.assertTrue("Pool should not hold 3 full row groups",
+        poolSize < (3 * rowGroupSize));
+
+    Runnable callback = new Runnable() {
+      @Override
+      public void run() {
+        counter++;
+      }
+    };
 
-    //Verify Callback mechanism
-    Assert.assertEquals("counter calculated by callback is incorrect.", 1, counter);
-    Assert.assertEquals("CallBack is duplicated.", 1, parquetOutputFormat.getMemoryManager()
-        .getScaleCallBacks().size());
-  }
+    // first-time registration should succeed
+    ParquetOutputFormat.getMemoryManager()
+        .registerScaleCallBack("increment-test-counter", callback);
 
-  private RecordWriter createWriter(int index) throws Exception{
-    Path file = new Path("target/test/", "parquet" + index);
-    parquetOutputFormat = new ParquetOutputFormat(new GroupWriteSupport());
-    RecordWriter writer = parquetOutputFormat.getRecordWriter(conf, file, codec);
     try {
-      parquetOutputFormat.getMemoryManager().registerScaleCallBack("increment-test-counter",
-          new Runnable() {
-            @Override
-            public void run() {
-              counter++;
-            }
-          });
-      if (!firstRegister) {
-        Assert.fail("Duplicated registering callback should throw duplicates exception.");
-      }
-      firstRegister = false;
+      ParquetOutputFormat.getMemoryManager()
+          .registerScaleCallBack("increment-test-counter", callback);
+      Assert.fail("Duplicated registering callback should throw duplicates exception.");
     } catch (IllegalArgumentException e) {
-      if (firstRegister) {
-        Assert.fail("Registering the same callback first time should succeed.");
-      }
+      // expected
+    }
+
+    // hit the limit once and clean up
+    RecordWriter writer1 = createWriter(1);
+    RecordWriter writer2 = createWriter(2);
+    RecordWriter writer3 = createWriter(3);
+    writer1.close(null);
+    writer2.close(null);
+    writer3.close(null);
+
+    //Verify Callback mechanism
+    Assert.assertEquals("Allocations should be adjusted once", 1, counter);
+    Assert.assertEquals("Should not allow duplicate callbacks",
+        1, ParquetOutputFormat.getMemoryManager().getScaleCallBacks().size());
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private RecordWriter createWriter(int index) throws Exception {
+    File file = temp.newFile(String.valueOf(index) + ".parquet");
+    if (!file.delete()) {
+      throw new RuntimeException("Could not delete file: " + file);
     }
+    RecordWriter writer = parquetOutputFormat.getRecordWriter(
+        conf, new Path(file.toString()),
+        CompressionCodecName.UNCOMPRESSED);
 
     return writer;
   }
 
-  private void verifyRowGroupSize(int expectRowGroupSize) {
-    Set<InternalParquetRecordWriter> writers = parquetOutputFormat.getMemoryManager()
-        .getWriterList().keySet();
+  private long getTotalAllocation() {
+    Set<InternalParquetRecordWriter> writers = ParquetOutputFormat
+        .getMemoryManager().getWriterList().keySet();
+    long total = 0;
     for (InternalParquetRecordWriter writer : writers) {
-      Assert.assertEquals("wrong rowGroupSize", expectRowGroupSize,
-          writer.getRowGroupSizeThreshold(), 1);
+      total += writer.getRowGroupSizeThreshold();
     }
+    return total;
   }
 }


Mime
View raw message