drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [2/6] drill git commit: DRILL-4278: Heap memory leak issues
Date Wed, 20 Jan 2016 15:31:10 GMT
DRILL-4278: Heap memory leak issues

- Fix issue where WorkspaceConfig was not returning consistent hashCode()s for equal objects.
- Fix issue where we were misusing recycler causing object reference leaks

This closes #331.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/43414e1e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/43414e1e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/43414e1e

Branch: refs/heads/master
Commit: 43414e1e2731c85a52febaaedebfa2f392dd6d2f
Parents: 412d08f
Author: Jacques Nadeau <jacques@apache.org>
Authored: Mon Jan 18 17:42:33 2016 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Wed Jan 20 07:29:38 2016 -0800

----------------------------------------------------------------------
 .../exec/store/dfs/NamedFormatPluginConfig.java |  31 +++++
 .../drill/exec/store/dfs/WorkspaceConfig.java   |  43 +++++--
 .../exec/store/easy/json/JSONFormatPlugin.java  |  26 +++-
 .../apache/drill/common/SerializedExecutor.java |  23 +---
 .../java/org/apache/drill/exec/rpc/RpcBus.java  | 118 ++++++-------------
 .../common/logical/FormatPluginConfigBase.java  |   3 +
 6 files changed, 128 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/43414e1e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
index 6a54bb3..035ad46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
@@ -25,4 +25,35 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 @JsonTypeName("named")
 public class NamedFormatPluginConfig implements FormatPluginConfig {
   public String name;
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    NamedFormatPluginConfig other = (NamedFormatPluginConfig) obj;
+    if (name == null) {
+      if (other.name != null) {
+        return false;
+      }
+    } else if (!name.equals(other.name)) {
+      return false;
+    }
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/43414e1e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
index a6ee545..f9b0097 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
@@ -57,18 +57,45 @@ public class WorkspaceConfig {
   }
 
   @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((defaultInputFormat == null) ? 0 : defaultInputFormat.hashCode());
+    result = prime * result + ((location == null) ? 0 : location.hashCode());
+    result = prime * result + (writable ? 1231 : 1237);
+    return result;
+  }
+
+  @Override
   public boolean equals(Object obj) {
-    if (obj == this) {
+    if (this == obj) {
       return true;
     }
-
-    if (obj == null || !(obj instanceof WorkspaceConfig)) {
+    if (obj == null) {
       return false;
     }
-
-    WorkspaceConfig that = (WorkspaceConfig) obj;
-    return ((this.location == null && that.location == null) || this.location.equals(that.location))
&&
-        this.writable == that.writable &&
-        ((this.defaultInputFormat == null && that.defaultInputFormat == null) ||
this.defaultInputFormat.equals(that.defaultInputFormat));
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    WorkspaceConfig other = (WorkspaceConfig) obj;
+    if (defaultInputFormat == null) {
+      if (other.defaultInputFormat != null) {
+        return false;
+      }
+    } else if (!defaultInputFormat.equals(other.defaultInputFormat)) {
+      return false;
+    }
+    if (location == null) {
+      if (other.location != null) {
+        return false;
+      }
+    } else if (!location.equals(other.location)) {
+      return false;
+    }
+    if (writable != other.writable) {
+      return false;
+    }
+    return true;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/43414e1e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 56c6c7d..3f093be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -105,21 +105,35 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig>
{
 
     @Override
     public int hashCode() {
-      return 31;
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((extensions == null) ? 0 : extensions.hashCode());
+      return result;
     }
 
     @Override
     public boolean equals(Object obj) {
       if (this == obj) {
         return true;
-      } else if (obj == null) {
+      }
+      if (obj == null) {
         return false;
-      } else if (getClass() == obj.getClass()) {
-        return true;
       }
-
-      return false;
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      JSONFormatConfig other = (JSONFormatConfig) obj;
+      if (extensions == null) {
+        if (other.extensions != null) {
+          return false;
+        }
+      } else if (!extensions.equals(other.extensions)) {
+        return false;
+      }
+      return true;
     }
+
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/43414e1e/exec/rpc/src/main/java/org/apache/drill/common/SerializedExecutor.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/common/SerializedExecutor.java b/exec/rpc/src/main/java/org/apache/drill/common/SerializedExecutor.java
index 6a3a823..8f3a86e 100644
--- a/exec/rpc/src/main/java/org/apache/drill/common/SerializedExecutor.java
+++ b/exec/rpc/src/main/java/org/apache/drill/common/SerializedExecutor.java
@@ -17,9 +17,6 @@
  */
 package org.apache.drill.common;
 
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-
 import java.util.LinkedList;
 import java.util.concurrent.Executor;
 
@@ -31,13 +28,6 @@ import java.util.concurrent.Executor;
  */
 public abstract class SerializedExecutor implements Executor {
 
-  private final Recycler<RunnableProcessor> processors = new Recycler<RunnableProcessor>()
{
-    @Override
-    protected RunnableProcessor newObject(Handle handle) {
-      return new RunnableProcessor(handle);
-    }
-  };
-
   private boolean isProcessing = false;
   private final LinkedList<Runnable> queuedRunnables = new LinkedList<>();
   private final Executor underlyingExecutor;
@@ -80,17 +70,11 @@ public abstract class SerializedExecutor implements Executor {
   protected abstract void runException(Runnable command, Throwable t);
 
   private class RunnableProcessor implements Runnable {
-    private final Handle handle;
 
     private Runnable command;
 
-    public RunnableProcessor(Handle handle) {
-      this.handle = handle;
-    }
-
-    public Runnable set(Runnable command) {
+    public RunnableProcessor(Runnable command) {
       this.command = command;
-      return this;
     }
 
     @Override
@@ -110,7 +94,6 @@ public abstract class SerializedExecutor implements Executor {
               throw new IllegalStateException("Exception handler threw an exception", ee);
             }
           }
-
           synchronized (queuedRunnables) {
             if (queuedRunnables.isEmpty()) {
               isProcessing = false;
@@ -122,8 +105,6 @@ public abstract class SerializedExecutor implements Executor {
         }
       } finally {
         currentThread.setName(originalThreadName);
-        command = null;
-        processors.recycle(this, handle);
       }
     }
   }
@@ -139,6 +120,6 @@ public abstract class SerializedExecutor implements Executor {
       isProcessing = true;
     }
 
-    underlyingExecutor.execute(processors.get().set(command));
+    underlyingExecutor.execute(new RunnableProcessor(command));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/43414e1e/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index b6f3032..acfb862 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -25,8 +25,6 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.MessageToMessageDecoder;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
@@ -193,10 +191,8 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
     private RemoteConnection connection;
     private int coordinationId;
     private final AtomicBoolean sent = new AtomicBoolean(false);
-    private final Recyclable recyclable;
 
-    public ResponseSenderImpl(Recyclable recyclable) {
-      this.recyclable = recyclable;
+    public ResponseSenderImpl() {
     }
 
     void set(RemoteConnection connection, int coordinationId){
@@ -206,19 +202,15 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
     }
 
     public void send(Response r) {
-      try {
-        assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass());
-        sendOnce();
-        OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType,
coordinationId,
-            r.pBody, r.dBodies);
-        if (RpcConstants.EXTRA_DEBUGGING) {
-          logger.debug("Adding message to outbound buffer. {}", outMessage);
-        }
-        logger.debug("Sending response with Sender {}", System.identityHashCode(this));
-        connection.getChannel().writeAndFlush(outMessage);
-      } finally {
-        recyclable.recycle();
+      assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass());
+      sendOnce();
+      OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType,
coordinationId,
+          r.pBody, r.dBodies);
+      if (RpcConstants.EXTRA_DEBUGGING) {
+        logger.debug("Adding message to outbound buffer. {}", outMessage);
       }
+      logger.debug("Sending response with Sender {}", System.identityHashCode(this));
+      connection.getChannel().writeAndFlush(outMessage);
     }
 
     /**
@@ -231,28 +223,24 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
     }
 
     void sendFailure(UserRpcException e){
-      try {
-        sendOnce();
-        UserException uex = UserException.systemError(e)
-            .addIdentity(e.getEndpoint())
-            .build(logger);
+      sendOnce();
+      UserException uex = UserException.systemError(e)
+          .addIdentity(e.getEndpoint())
+          .build(logger);
 
-        logger.error("Unexpected Error while handling request message", e);
+      logger.error("Unexpected Error while handling request message", e);
 
-        OutboundRpcMessage outMessage = new OutboundRpcMessage(
-            RpcMode.RESPONSE_FAILURE,
-            0,
-            coordinationId,
-            uex.getOrCreatePBError(false)
-            );
+      OutboundRpcMessage outMessage = new OutboundRpcMessage(
+          RpcMode.RESPONSE_FAILURE,
+          0,
+          coordinationId,
+          uex.getOrCreatePBError(false)
+          );
 
-        if (RpcConstants.EXTRA_DEBUGGING) {
-          logger.debug("Adding message to outbound buffer. {}", outMessage);
-        }
-        connection.getChannel().writeAndFlush(outMessage);
-      } finally {
-        recyclable.recycle();
+      if (RpcConstants.EXTRA_DEBUGGING) {
+        logger.debug("Adding message to outbound buffer. {}", outMessage);
       }
+      connection.getChannel().writeAndFlush(outMessage);
     }
 
   }
@@ -293,14 +281,12 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
 
         switch (msg.mode) {
         case REQUEST:
-          RequestEvent reqEvent = requestRecycler.get();
-          reqEvent.set(msg.coordinationId, connection, msg.rpcType, msg.pBody, msg.dBody);
+          RequestEvent reqEvent = new RequestEvent(msg.coordinationId, connection, msg.rpcType,
msg.pBody, msg.dBody);
           exec.execute(reqEvent);
           break;
 
         case RESPONSE:
-          ResponseEvent respEvent = responseRecycler.get();
-          respEvent.set(msg.rpcType, msg.coordinationId, msg.pBody, msg.dBody);
+          ResponseEvent respEvent = new ResponseEvent(msg.rpcType, msg.coordinationId, msg.pBody,
msg.dBody);
           exec.execute(respEvent);
           break;
 
@@ -358,27 +344,15 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
 
   }
 
-  private final Recycler<RequestEvent> requestRecycler = new Recycler<RequestEvent>()
{
-    @Override
-    protected RequestEvent newObject(Handle handle) {
-      return new RequestEvent(handle);
-    }
-  };
-
-  private class RequestEvent implements Runnable, Recyclable {
+  private class RequestEvent implements Runnable {
     private final ResponseSenderImpl sender;
-    private final Handle handle;
-    private C connection;
-    private int rpcType;
-    private ByteBuf pBody;
-    private ByteBuf dBody;
-
-    RequestEvent(Handle handle){
-      this.handle = handle;
-      sender = new ResponseSenderImpl(this);
-    }
+    private final C connection;
+    private final int rpcType;
+    private final ByteBuf pBody;
+    private final ByteBuf dBody;
 
-    public void set(int coordinationId, C connection, int rpcType, ByteBuf pBody, ByteBuf
dBody) {
+    RequestEvent(int coordinationId, C connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
{
+      sender = new ResponseSenderImpl();
       this.connection = connection;
       this.rpcType = rpcType;
       this.pBody = pBody;
@@ -414,35 +388,18 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
 
     }
 
-    @Override
-    public void recycle() {
-      // We must defer recycling until the sender has been used.
-      requestRecycler.recycle(this, handle);
-    }
-
 
   }
 
-  private final Recycler<ResponseEvent> responseRecycler = new Recycler<ResponseEvent>()
{
-    @Override
-    protected ResponseEvent newObject(Handle handle) {
-      return new ResponseEvent(handle);
-    }
-  };
 
   private class ResponseEvent implements Runnable {
-    private final Handle handle;
 
-    private int rpcType;
-    private int coordinationId;
-    private ByteBuf pBody;
-    private ByteBuf dBody;
-
-    public ResponseEvent(Handle handle){
-      this.handle = handle;
-    }
+    private final int rpcType;
+    private final int coordinationId;
+    private final ByteBuf pBody;
+    private final ByteBuf dBody;
 
-    public void set(int rpcType, int coordinationId, ByteBuf pBody, ByteBuf dBody) {
+    public ResponseEvent(int rpcType, int coordinationId, ByteBuf pBody, ByteBuf dBody) {
       this.rpcType = rpcType;
       this.coordinationId = coordinationId;
       this.pBody = pBody;
@@ -479,7 +436,6 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
           dBody.release();
         }
 
-        responseRecycler.recycle(this, handle);
       }
 
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/43414e1e/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java
----------------------------------------------------------------------
diff --git a/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java
b/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java
index d65520f..6b9dfec 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java
@@ -52,4 +52,7 @@ public abstract class FormatPluginConfigBase implements FormatPluginConfig{
   @Override
   public abstract boolean equals(Object o);
 
+  @Override
+  public abstract int hashCode();
+
 }


Mime
View raw message