parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-164: Add a counter and increment when parquet memory manager kicks in
Date Tue, 19 May 2015 18:26:16 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master a458e1a2f -> 181affd5c


PARQUET-164: Add a counter and increment when parquet memory manager kicks in

Add a counter for writers, and increment it when memory manager scaling down row group size.

Hive could use this counter to warn users.

Author: dongche1 <dong1.chen@intel.com>
Author: dongche <dong1.chen@intel.com>
Author: root <root@bdpe15.sh.intel.com>

Closes #120 from dongche/PARQUET-164 and squashes the following commits:

9bcb1ba [dongche] Remove stats, and change returned callbacks map unmodifiable
3cbbeb9 [dongche] Merge remote branch 'upstream1/master' into PARQUET-164
bdef233 [dongche] Merge remote branch 'upstream1/master' into PARQUET-164
780be6d [root] revert change about callable and address comments
11f9163 [dongche1] Merge remote branch 'upstream/master' into PARQUET-164
55549a5 [dongche1] Use callable and strict registerScallCallBack method.
74054aa [dongche1] Use Runnable as a generic callback
8782a02 [dongche1] Add a callback mechanism instead of shims. And rebase trunk
b138b7f [dongche1] Merge remote branch 'upstream/master' into PARQUET-164
93a4678 [dongche1] PARQUET-164: Add a counter and increment when parquet memory manager kicks
in


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/181affd5
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/181affd5
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/181affd5

Branch: refs/heads/master
Commit: 181affd5c937755f54ff31ef056a6ec091e95f51
Parents: a458e1a
Author: dongche1 <dong1.chen@intel.com>
Authored: Tue May 19 11:26:07 2015 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Tue May 19 11:26:07 2015 -0700

----------------------------------------------------------------------
 .../apache/parquet/hadoop/MemoryManager.java    | 42 +++++++++++++++++++-
 .../parquet/hadoop/ParquetOutputFormat.java     |  2 +-
 .../parquet/hadoop/TestMemoryManager.java       | 28 ++++++++++++-
 3 files changed, 69 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/181affd5/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java
index ec70b87..f997d08 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java
@@ -20,8 +20,10 @@ package org.apache.parquet.hadoop;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.Preconditions;
 
 import java.lang.management.ManagementFactory;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -47,6 +49,8 @@ public class MemoryManager {
   private final long minMemoryAllocation;
   private final Map<InternalParquetRecordWriter, Long> writerList = new
       HashMap<InternalParquetRecordWriter, Long>();
+  private final Map<String, Runnable> callBacks = new HashMap<String, Runnable>();
+  private double scale = 1.0;
 
   public MemoryManager(float ratio, long minAllocation) {
     checkRatio(ratio);
@@ -100,7 +104,6 @@ public class MemoryManager {
    */
   private void updateAllocation() {
     long totalAllocations = 0;
-    double scale;
     for (Long allocation : writerList.values()) {
       totalAllocations += allocation;
     }
@@ -112,6 +115,10 @@ public class MemoryManager {
           "Total allocation exceeds %.2f%% (%,d bytes) of heap memory\n" +
           "Scaling row group sizes to %.2f%% for %d writers",
           100*memoryPoolRatio, totalMemoryPool, 100*scale, writerList.size()));
+      for (Runnable callBack : callBacks.values()) {
+        // we do not really want to start a new thread here.
+        callBack.run();
+      }
     }
 
     int maxColCount = 0;
@@ -155,4 +162,37 @@ public class MemoryManager {
   float getMemoryPoolRatio() {
     return memoryPoolRatio;
   }
+
+  /**
+   * Register callback and deduplicate it if any.
+   * @param callBackName the name of callback. It should be identical.
+   * @param callBack the callback passed in from upper layer, such as Hive.
+   */
+  public void registerScaleCallBack(String callBackName, Runnable callBack) {
+    Preconditions.checkNotNull(callBackName, "callBackName");
+    Preconditions.checkNotNull(callBack, "callBack");
+
+    if (callBacks.containsKey(callBackName)) {
+      throw new IllegalArgumentException("The callBackName " + callBackName +
+          " is duplicated and has been registered already.");
+    } else {
+      callBacks.put(callBackName, callBack);
+    }
+  }
+
+  /**
+   * Get the registered callbacks.
+   * @return
+   */
+  Map<String, Runnable> getScaleCallBacks() {
+    return Collections.unmodifiableMap(callBacks);
+  }
+
+  /**
+   * Get the internal scale value of MemoryManger
+   * @return
+   */
+  double getScale() {
+    return scale;
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/181affd5/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index d849843..ea3101a 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -347,7 +347,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
    */
   private static MemoryManager memoryManager;
 
-  static MemoryManager getMemoryManager() {
+  public static MemoryManager getMemoryManager() {
     return memoryManager;
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/181affd5/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
index fd56957..bbe7443 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
@@ -48,6 +48,8 @@ public class TestMemoryManager {
   int rowGroupSize;
   ParquetOutputFormat parquetOutputFormat;
   CompressionCodecName codec;
+  int counter = 0;
+  boolean firstRegister = true;
 
   @Before
   public void setUp() {
@@ -87,12 +89,36 @@ public class TestMemoryManager {
     //Verify the memory pool
     Assert.assertEquals("memory pool size is incorrect.", expectPoolSize,
         parquetOutputFormat.getMemoryManager().getTotalMemoryPool());
+
+    //Verify Callback mechanism
+    Assert.assertEquals("counter calculated by callback is incorrect.", 1, counter);
+    Assert.assertEquals("CallBack is duplicated.", 1, parquetOutputFormat.getMemoryManager()
+        .getScaleCallBacks().size());
   }
 
   private RecordWriter createWriter(int index) throws Exception{
     Path file = new Path("target/test/", "parquet" + index);
     parquetOutputFormat = new ParquetOutputFormat(new GroupWriteSupport());
-    return parquetOutputFormat.getRecordWriter(conf, file, codec);
+    RecordWriter writer = parquetOutputFormat.getRecordWriter(conf, file, codec);
+    try {
+      parquetOutputFormat.getMemoryManager().registerScaleCallBack("increment-test-counter",
+          new Runnable() {
+            @Override
+            public void run() {
+              counter++;
+            }
+          });
+      if (!firstRegister) {
+        Assert.fail("Duplicated registering callback should throw duplicates exception.");
+      }
+      firstRegister = false;
+    } catch (IllegalArgumentException e) {
+      if (firstRegister) {
+        Assert.fail("Registering the same callback first time should succeed.");
+      }
+    }
+
+    return writer;
   }
 
   private void verifyRowGroupSize(int expectRowGroupSize) {


Mime
View raw message