tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-1568: Apply UnpooledByteBufAllocator when a tajo.test.enabled is set to enable.
Date Mon, 20 Apr 2015 02:15:26 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.10.1 4a02456d3 -> 5e1fa93b5


TAJO-1568: Apply UnpooledByteBufAllocator when a tajo.test.enabled is set to enable.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5e1fa93b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5e1fa93b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5e1fa93b

Branch: refs/heads/branch-0.10.1
Commit: 5e1fa93b53cc4b575996a4aceaeb781567dc47d6
Parents: 4a02456
Author: Jinho Kim <jhkim@apache.org>
Authored: Mon Apr 20 11:13:14 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Mon Apr 20 11:13:14 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../java/org/apache/tajo/util/NumberUtil.java   | 26 ++++----
 .../java/org/apache/tajo/QueryTestCaseBase.java |  4 +-
 .../org/apache/tajo/storage/BufferPool.java     | 67 +++++++++++++++++---
 4 files changed, 79 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/5e1fa93b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index fd133d5..806da48 100644
--- a/CHANGES
+++ b/CHANGES
@@ -83,6 +83,9 @@ Release 0.10.1 - unreleased
   
   TASKS
 
+    TAJO-1568: Apply UnpooledByteBufAllocator when a tajo.test.enabled 
+    is set to enable. (jinho)
+
     TAJO-1567: Update old license in some pom.xml files.
     (Contributed by Dongjoon Hyun, Committed by jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e1fa93b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
index 9e16cec..0d70cc2 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
@@ -604,14 +604,14 @@ public class NumberUtil {
    * @throws NumberFormatException if the argument could not be parsed as a double
    */
   public static double parseDouble(ByteBuf bytes, int start, int length) {
-    if (!PlatformDependent.hasUnsafe()) {
-      return parseDouble(bytes.array(), start, length);
-    }
-
     if (bytes == null) {
       throw new NumberFormatException("String is null");
     }
 
+    if (!bytes.hasMemoryAddress()) {
+      return parseDouble(bytes.array(), start, length);
+    }
+
     if (length == 0 || bytes.writerIndex() < start + length) {
       throw new NumberFormatException("Empty string or Invalid buffer!");
     }
@@ -815,13 +815,14 @@ public class NumberUtil {
    * @throws NumberFormatException if the argument could not be parsed as an int quantity.
    */
   public static int parseInt(ByteBuf bytes, int start, int length, int radix) {
-    if (!PlatformDependent.hasUnsafe()) {
-      return parseInt(bytes.array(), start, length);
-    }
-
     if (bytes == null) {
       throw new NumberFormatException("String is null");
     }
+
+    if (!bytes.hasMemoryAddress()) {
+      return parseInt(bytes.array(), start, length);
+    }
+
     if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) {
       throw new NumberFormatException("Invalid radix: " + radix);
     }
@@ -942,13 +943,14 @@ public class NumberUtil {
    * @throws NumberFormatException if the argument could not be parsed as an long quantity.
    */
   public static long parseLong(ByteBuf bytes, int start, int length, int radix) {
-    if (!PlatformDependent.hasUnsafe()) {
-      return parseInt(bytes.array(), start, length);
-    }
-
     if (bytes == null) {
       throw new NumberFormatException("String is null");
     }
+
+    if (!bytes.hasMemoryAddress()) {
+      return parseInt(bytes.array(), start, length);
+    }
+
     if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) {
       throw new NumberFormatException("Invalid radix: " + radix);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e1fa93b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 15fbdae..ddfa7a6 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -212,7 +212,9 @@ public class QueryTestCaseBase {
   @Before
   public void printTestName() {
     /* protect a travis stalled build */
-    System.out.println("Run: " + name.getMethodName());
+    System.out.println("Run: " + name.getMethodName() +
+         " Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())
+        / (1024 * 1024)) + "MBytes");
   }
 
   public QueryTestCaseBase() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e1fa93b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
index 85c79fa..d611ee3 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
@@ -19,24 +19,75 @@
 package org.apache.tajo.storage;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
 import io.netty.util.internal.PlatformDependent;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.CommonTestingUtil;
+
+import java.lang.reflect.Field;
 
 /* this class is PooledBuffer holder */
 public class BufferPool {
 
-  private static final PooledByteBufAllocator allocator;
+  public static final String ALLOW_CACHE = "tajo.storage.buffer.thread-local.cache";
+  private static final ByteBufAllocator ALLOCATOR;
 
   private BufferPool() {
   }
 
   static {
-    //TODO we need determine the default params
-    allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
+    /* TODO Enable thread cache
+    *  Create a pooled ByteBuf allocator but disables the thread-local cache.
+    *  Because the TaskRunner thread is newly created
+    * */
+
+    if (System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE"))
{
+      /* Disable pooling buffers for memory usage  */
+      ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
+
+      /* if you are finding memory leak, please enable this line */
+      ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
+    } else {
+      TajoConf tajoConf = new TajoConf();
+      ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, false),
0);
+    }
+  }
+
+  /**
+   * borrowed from Spark
+   */
+  public static PooledByteBufAllocator createPooledByteBufAllocator(
+      boolean allowDirectBufs,
+      boolean allowCache,
+      int numCores) {
+    if (numCores == 0) {
+      numCores = Runtime.getRuntime().availableProcessors();
+    }
+    return new PooledByteBufAllocator(
+        allowDirectBufs && PlatformDependent.directBufferPreferred(),
+        Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
+        Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores
: 0),
+        getPrivateStaticField("DEFAULT_PAGE_SIZE"),
+        getPrivateStaticField("DEFAULT_MAX_ORDER"),
+        allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
+        allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
+        allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
+    );
+  }
 
-    /* if you are finding memory leak, please enable this line */
-    //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
+  /** Used to get defaults from Netty's private static fields. */
+  private static int getPrivateStaticField(String name) {
+    try {
+      Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
+      f.setAccessible(true);
+      return f.getInt(null);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public static long maxDirectMemory() {
@@ -44,8 +95,8 @@ public class BufferPool {
   }
 
 
-  public synchronized static ByteBuf directBuffer(int size) {
-    return allocator.directBuffer(size);
+  public static ByteBuf directBuffer(int size) {
+    return ALLOCATOR.directBuffer(size);
   }
 
   /**
@@ -55,7 +106,7 @@ public class BufferPool {
    * @return allocated ByteBuf from pool
    */
   public static ByteBuf directBuffer(int size, int max) {
-    return allocator.directBuffer(size, max);
+    return ALLOCATOR.directBuffer(size, max);
   }
 
   @InterfaceStability.Unstable


Mime
View raw message