drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [1/6] drill git commit: DRILL-4131: Move RPC allocators under Drill's root allocator & accounting
Date Wed, 20 Jan 2016 15:31:09 GMT
Repository: drill
Updated Branches:
  refs/heads/master 8a28131e5 -> 9cc7e116f


DRILL-4131: Move RPC allocators under Drill's root allocator & accounting

- Allow settings to be set to ensure RPC reservation and maximums (currently unset by default).
Defaults set in drill-module.conf
- Add new metrics to report RPC layer memory consumption.
- Check for memory leaks from RPC layer at shutdown.
- Add a multi-Drillbit single JVM safe DrillMetrics.register()
- Remove invalid verifyAllocator checks while RPC connection (and PING/PONG) are maintained

This closes #327.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/412d08f9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/412d08f9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/412d08f9

Branch: refs/heads/master
Commit: 412d08f9ffd28b877eab2130f2b3f907c5cd43ca
Parents: 8a28131
Author: Jacques Nadeau <jacques@apache.org>
Authored: Fri Jan 15 15:53:13 2016 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Wed Jan 20 07:29:05 2016 -0800

----------------------------------------------------------------------
 .../apache/drill/exec/metrics/DrillMetrics.java |  8 ++
 .../rpc/control/ConnectionManagerRegistry.java  |  7 +-
 .../drill/exec/rpc/control/ControlClient.java   |  5 +-
 .../rpc/control/ControlConnectionManager.java   |  8 +-
 .../drill/exec/rpc/control/ControllerImpl.java  |  6 +-
 .../exec/rpc/data/DataConnectionCreator.java    | 10 ++-
 .../drill/exec/service/ServiceEngine.java       | 80 ++++++++++++++++++--
 .../src/main/resources/drill-module.conf        | 16 +++-
 .../java/org/apache/drill/BaseTestQuery.java    | 20 -----
 .../java/org/apache/drill/TestTpchLimit0.java   |  6 --
 .../netty/buffer/PooledByteBufAllocatorL.java   |  6 +-
 .../apache/drill/exec/memory/BaseAllocator.java |  8 +-
 12 files changed, 128 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/common/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java b/common/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
index 781c9d6..e99a40f 100644
--- a/common/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
+++ b/common/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
@@ -89,6 +89,14 @@ public class DrillMetrics {
     }
   }
 
+  public synchronized static <T extends Metric> void register(String name, T metric)
{
+    boolean removed = RegistryHolder.REGISTRY.remove(name);
+    if (removed) {
+      logger.warn("Removing old metric since name matched newly registered metric. Metric
name: {}", name);
+    }
+    RegistryHolder.REGISTRY.register(name, metric);
+  }
+
   private static void registerAll(String prefix, MetricSet metricSet, MetricRegistry registry)
{
     for (Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
       if (entry.getValue() instanceof MetricSet) {

http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
index 06d6e77..1ac30e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.rpc.control;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
@@ -34,18 +35,20 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana
   private final ControlMessageHandler handler;
   private final BootStrapContext context;
   private volatile DrillbitEndpoint localEndpoint;
+  private final BufferAllocator allocator;
 
-  public ConnectionManagerRegistry(ControlMessageHandler handler, BootStrapContext context)
{
+  public ConnectionManagerRegistry(BufferAllocator allocator, ControlMessageHandler handler,
BootStrapContext context) {
     super();
     this.handler = handler;
     this.context = context;
+    this.allocator = allocator;
   }
 
   public ControlConnectionManager getConnectionManager(DrillbitEndpoint endpoint) {
     assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager
can be retrieved";
     ControlConnectionManager m = registry.get(endpoint);
     if (m == null) {
-      m = new ControlConnectionManager(endpoint, localEndpoint, handler, context);
+      m = new ControlConnectionManager(allocator, endpoint, localEndpoint, handler, context);
       ControlConnectionManager m2 = registry.putIfAbsent(endpoint, m);
       if (m2 != null) {
         m = m2;

http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index cc238f5..c5bf6b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -49,10 +49,11 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection,
BitCo
   private final DrillbitEndpoint localIdentity;
   private final BufferAllocator allocator;
 
-  public ControlClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, ControlMessageHandler
handler,
+  public ControlClient(BufferAllocator allocator, DrillbitEndpoint remoteEndpoint, DrillbitEndpoint
localEndpoint,
+      ControlMessageHandler handler,
       BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory)
{
     super(ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
-        context.getAllocator().getAsByteBufAllocator(),
+        allocator.getAsByteBufAllocator(),
         context.getBitLoopGroup(),
         RpcType.HANDSHAKE,
         BitControlHandshake.class,

http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
index 60af2a9..611b727 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.BasicClient;
@@ -34,13 +35,16 @@ public class ControlConnectionManager extends ReconnectingConnection<ControlConn
   private final ControlMessageHandler handler;
   private final BootStrapContext context;
   private final DrillbitEndpoint localIdentity;
+  private final BufferAllocator allocator;
 
-  public ControlConnectionManager(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localIdentity,
ControlMessageHandler handler, BootStrapContext context) {
+  public ControlConnectionManager(BufferAllocator allocator, DrillbitEndpoint remoteEndpoint,
+      DrillbitEndpoint localIdentity, ControlMessageHandler handler, BootStrapContext context)
{
     super(BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(),
remoteEndpoint.getAddress(), remoteEndpoint.getControlPort());
     assert remoteEndpoint != null : "Endpoint cannot be null.";
     assert remoteEndpoint.getAddress() != null && !remoteEndpoint.getAddress().isEmpty():
"Endpoint address cannot be null.";
     assert remoteEndpoint.getControlPort() > 0 : String.format("Bit Port must be set to
a port between 1 and 65k.  Was set to %d.", remoteEndpoint.getControlPort());
 
+    this.allocator = allocator;
     this.endpoint = remoteEndpoint;
     this.localIdentity = localIdentity;
     this.handler = handler;
@@ -49,7 +53,7 @@ public class ControlConnectionManager extends ReconnectingConnection<ControlConn
 
   @Override
   protected BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient()
{
-    return new ControlClient(endpoint, localIdentity, handler, context, new CloseHandlerCreator());
+    return new ControlClient(allocator, endpoint, localIdentity, handler, context, new CloseHandlerCreator());
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index 0564cca..a22d207 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc.control;
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
@@ -40,11 +41,12 @@ public class ControllerImpl implements Controller {
   private final boolean allowPortHunting;
   private final CustomHandlerRegistry handlerRegistry;
 
-  public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, boolean
allowPortHunting) {
+  public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, BufferAllocator
allocator,
+      boolean allowPortHunting) {
     super();
     this.handler = handler;
     this.context = context;
-    this.connectionRegistry = new ConnectionManagerRegistry(handler, context);
+    this.connectionRegistry = new ConnectionManagerRegistry(allocator, handler, context);
     this.allowPortHunting = allowPortHunting;
     this.handlerRegistry = handler.getHandlerRegistry();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index b2a51ca..d1ba92b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -44,14 +44,18 @@ public class DataConnectionCreator implements Closeable {
   private ConcurrentMap<DrillbitEndpoint, DataConnectionManager> connectionManager
= Maps.newConcurrentMap();
   private final BufferAllocator dataAllocator;
 
-  public DataConnectionCreator(BootStrapContext context, WorkEventBus workBus, WorkerBee
bee, boolean allowPortHunting) {
+  public DataConnectionCreator(
+      BootStrapContext context,
+      BufferAllocator allocator,
+      WorkEventBus workBus,
+      WorkerBee bee,
+      boolean allowPortHunting) {
     super();
     this.context = context;
     this.workBus = workBus;
     this.bee = bee;
     this.allowPortHunting = allowPortHunting;
-    this.dataAllocator = context.getAllocator()
-        .newChildAllocator("rpc-data", 0, Long.MAX_VALUE);
+    this.dataAllocator = allocator;
   }
 
   public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException
{

http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index e07ca90..6444cb8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -18,10 +18,10 @@
 package org.apache.drill.exec.service;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import io.netty.buffer.PooledByteBufAllocatorL;
 import io.netty.channel.EventLoopGroup;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.Executor;
@@ -29,9 +29,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.TransportCheck;
 import org.apache.drill.exec.rpc.control.Controller;
@@ -44,10 +47,12 @@ import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 import org.apache.drill.exec.work.user.UserWorker;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
 import com.google.common.base.Stopwatch;
 import com.google.common.io.Closeables;
 
-public class ServiceEngine implements Closeable{
+public class ServiceEngine implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ServiceEngine.class);
 
   private final UserServer userServer;
@@ -56,22 +61,83 @@ public class ServiceEngine implements Closeable{
   private final DrillConfig config;
   boolean useIP = false;
   private final boolean allowPortHunting;
+  private final BufferAllocator userAllocator;
+  private final BufferAllocator controlAllocator;
+  private final BufferAllocator dataAllocator;
+
 
   public ServiceEngine(ControlMessageHandler controlMessageHandler, UserWorker userWorker,
BootStrapContext context,
       WorkEventBus workBus, WorkerBee bee, boolean allowPortHunting) throws DrillbitStartupException
{
+    userAllocator = newAllocator(context, "rpc:user", "drill.exec.rpc.user.server.memory.reservation",
+        "drill.exec.rpc.user.server.memory.maximum");
+    controlAllocator = newAllocator(context, "rpc:bit-control",
+        "drill.exec.rpc.bit.server.memory.control.reservation", "drill.exec.rpc.bit.server.memory.control.maximum");
+    dataAllocator = newAllocator(context, "rpc:bit-data",
+        "drill.exec.rpc.bit.server.memory.data.reservation", "drill.exec.rpc.bit.server.memory.data.maximum");
     final EventLoopGroup eventLoopGroup = TransportCheck.createEventLoopGroup(
         context.getConfig().getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-");
     this.userServer = new UserServer(
         context.getConfig(),
         context.getClasspathScan(),
-        context.getAllocator(),
+        userAllocator,
         eventLoopGroup,
         userWorker,
         context.getExecutor());
-    this.controller = new ControllerImpl(context, controlMessageHandler, allowPortHunting);
-    this.dataPool = new DataConnectionCreator(context, workBus, bee, allowPortHunting);
+    this.controller = new ControllerImpl(context, controlMessageHandler, controlAllocator,
allowPortHunting);
+    this.dataPool = new DataConnectionCreator(context, dataAllocator, workBus, bee, allowPortHunting);
     this.config = context.getConfig();
     this.allowPortHunting = allowPortHunting;
+    registerMetrics(context.getMetrics());
+
+  }
+
+  private final void registerMetrics(final MetricRegistry registry) {
+    final String prefix = PooledByteBufAllocatorL.METRIC_PREFIX + "rpc.";
+    DrillMetrics.register(prefix + "user.current", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return userAllocator.getAllocatedMemory();
+      }
+    });
+    DrillMetrics.register(prefix + "user.peak", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return userAllocator.getPeakMemoryAllocation();
+      }
+    });
+    DrillMetrics.register(prefix + "bit.control.current", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return controlAllocator.getAllocatedMemory();
+      }
+    });
+    DrillMetrics.register(prefix + "bit.control.peak", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return controlAllocator.getPeakMemoryAllocation();
+      }
+    });
+
+    DrillMetrics.register(prefix + "bit.data.current", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return dataAllocator.getAllocatedMemory();
+      }
+    });
+    DrillMetrics.register(prefix + "bit.data.peak", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return dataAllocator.getPeakMemoryAllocation();
+      }
+    });
+
+  }
+
+
+  private static BufferAllocator newAllocator(
+      BootStrapContext context, String name, String initReservation, String maxAllocation)
{
+    return context.getAllocator().newChildAllocator(
+        name, context.getConfig().getLong(initReservation), context.getConfig().getLong(maxAllocation));
   }
 
   public DrillbitEndpoint start() throws DrillbitStartupException, UnknownHostException{
@@ -110,7 +176,7 @@ public class ServiceEngine implements Closeable{
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() throws Exception {
     // this takes time so close them in parallel
     // Ideally though we fix this netty bug: https://github.com/netty/netty/issues/2545
     ExecutorService p = Executors.newFixedThreadPool(2);
@@ -123,5 +189,7 @@ public class ServiceEngine implements Closeable{
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }
+    AutoCloseables.close(userAllocator, controlAllocator, dataAllocator);
+
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 10c5319..d6ba99a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -52,7 +52,11 @@ drill.exec: {
       timeout: 30,
       server: {
         port: 31010
-        threads: 1
+        threads: 1,
+        memory: {
+          reservation: 0,
+          maximum: 9223372036854775807
+        }
       }
       client: {
         threads: 1
@@ -67,6 +71,16 @@ drill.exec: {
           delay: 500
         },
         threads: 10
+        memory: {
+          control: {
+            reservation: 0,
+            maximum: 9223372036854775807
+          },
+          data: {
+            reservation: 0,
+            maximum: 9223372036854775807
+          }
+        }
       }
     },
     use.ip : false

http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 2077d6e..26b7464 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -18,11 +18,9 @@
 package org.apache.drill;
 
 import static org.hamcrest.core.StringContains.containsString;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -40,7 +38,6 @@ import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -250,23 +247,6 @@ public class BaseTestQuery extends ExecTest {
     return new TestBuilder(allocator);
   }
 
-  /**
-   * Utility function that can be used in tests to verify the state of drillbit
-   * allocators.
-   */
-  public static void verifyAllocators() {
-    if (bits != null) {
-      for(Drillbit bit : bits) {
-        if (bit != null) {
-          final DrillbitContext drillbitContext = bit.getContext();
-          final BufferAllocator bufferAllocator = drillbitContext.getAllocator();
-          final RootAllocator rootAllocator = (RootAllocator) bufferAllocator;
-          rootAllocator.verify();
-        }
-      }
-    }
-  }
-
   @AfterClass
   public static void closeClient() throws IOException {
     if (client != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
index 6d2fbf0..22471c8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
@@ -17,18 +17,12 @@
  */
 package org.apache.drill;
 
-import org.junit.After;
 import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestTpchLimit0 extends BaseTestQuery{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchLimit0.class);
 
-  @After
-  public void checkForLeaks() {
-    verifyAllocators();
-  }
-
   private void testLimitZero(String fileName) throws Exception {
     String query = getFile(fileName);
     query = "ALTER SESSION SET `planner.slice_target` = 1; select * from \n(" + query.replace(";",
")xyz limit 0;");

http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index 47dbf59..1ee7964 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -40,7 +40,7 @@ public class PooledByteBufAllocatorL {
   private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
 
 
-  private static final String METRIC_PREFIX = "drill.allocator.";
+  public static final String METRIC_PREFIX = "drill.allocator.";
 
   private final MetricRegistry registry;
   private final AtomicLong hugeBufferSize = new AtomicLong(0);
@@ -48,7 +48,7 @@ public class PooledByteBufAllocatorL {
   private final AtomicLong normalBufferSize = new AtomicLong(0);
   private final AtomicLong normalBufferCount = new AtomicLong(0);
 
-  public final InnerAllocator allocator;
+  private final InnerAllocator allocator;
   public final UnsafeDirectLittleEndian empty;
 
   public PooledByteBufAllocatorL(MetricRegistry registry) {
@@ -59,7 +59,7 @@ public class PooledByteBufAllocatorL {
 
   public UnsafeDirectLittleEndian allocate(int size) {
     try {
-      return allocator.directBuffer(size, size);
+      return allocator.directBuffer(size, Integer.MAX_VALUE);
     } catch (OutOfMemoryError e) {
       throw new OutOfMemoryException("Failure allocating buffer.", e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/412d08f9/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index 78c3c73..bf4dc8a 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -84,9 +84,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     this.parentAllocator = parentAllocator;
     this.name = name;
 
-    // TODO: DRILL-4131
-    // this.thisAsByteBufAllocator = new DrillByteBufAllocator(this);
-    this.thisAsByteBufAllocator = AllocatorManager.INNER_ALLOCATOR.allocator;
+    this.thisAsByteBufAllocator = new DrillByteBufAllocator(this);
 
     if (DEBUG) {
       childAllocators = new IdentityHashMap<>();
@@ -598,7 +596,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
         }
         buffersSeen.put(udle, this);
 
-        bufferTotal += udle.maxCapacity();
+        bufferTotal += udle.capacity();
       }
 
       // Preallocated space has to be accounted for
@@ -705,7 +703,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       sb.append("UnsafeDirectLittleEndian[dentityHashCode == ");
       sb.append(Integer.toString(System.identityHashCode(udle)));
       sb.append("] size ");
-      sb.append(Integer.toString(udle.maxCapacity()));
+      sb.append(Integer.toString(udle.capacity()));
       sb.append('\n');
     }
   }


Mime
View raw message