drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prog...@apache.org
Subject [2/3] drill git commit: DRILL-5874: NPE in AnonWebUserConnection.cleanupSession()
Date Mon, 30 Oct 2017 20:00:47 GMT
DRILL-5874: NPE in AnonWebUserConnection.cleanupSession()

closes #993


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a26fbec1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a26fbec1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a26fbec1

Branch: refs/heads/master
Commit: a26fbec13134f249e258be6735b82cf09ab1f406
Parents: 8eda4d7
Author: Sorabh Hamirwasia <shamirwasia@maprtech.com>
Authored: Wed Oct 11 15:57:21 2017 -0700
Committer: Paul Rogers <progers@maprtech.com>
Committed: Mon Oct 30 11:43:08 2017 -0700

----------------------------------------------------------------------
 .../drill/exec/server/rest/DrillRestServer.java |  37 +++-
 .../exec/server/rest/WebSessionResources.java   |  16 +-
 .../exec/server/rest/WebUserConnection.java     |  11 +-
 .../server/rest/WebSessionResourcesTest.java    | 168 +++++++++++++++++++
 4 files changed, 219 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a26fbec1/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 6eb47e6..1545847 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -29,6 +29,9 @@ import freemarker.cache.TemplateLoader;
 import freemarker.cache.WebappTemplateLoader;
 import freemarker.core.HTMLOutputFormat;
 import freemarker.template.Configuration;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.EventExecutor;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -108,10 +111,17 @@ public class DrillRestServer extends ResourceConfig {
     provider.setMapper(workManager.getContext().getLpPersistence().getMapper());
     register(provider);
 
+    // Get an EventExecutor out of the BitServer EventLoopGroup to notify listeners for WebUserConnection.
For
+    // actual connections between Drillbits this EventLoopGroup is used to handle network
related events. Though
+    // there is no actual network connection associated with WebUserConnection but we need
a CloseFuture in
+    // WebSessionResources, so we are using EvenExecutor from network EventLoopGroup pool.
+    final EventExecutor executor = workManager.getContext().getBitLoopGroup().next();
+
     register(new AbstractBinder() {
       @Override
       protected void configure() {
         bind(workManager).to(WorkManager.class);
+        bind(executor).to(EventExecutor.class);
         bind(workManager.getContext().getLpPersistence().getMapper()).to(ObjectMapper.class);
         bind(workManager.getContext().getStoreProvider()).to(PersistentStoreProvider.class);
         bind(workManager.getContext().getStorage()).to(StoragePluginRegistry.class);
@@ -159,6 +169,9 @@ public class DrillRestServer extends ResourceConfig {
     @Inject
     WorkManager workManager;
 
+    @Inject
+    EventExecutor executor;
+
     @SuppressWarnings("resource")
     @Override
     public WebUserConnection provide() {
@@ -204,9 +217,15 @@ public class DrillRestServer extends ResourceConfig {
                 config.getLong(ExecConstants.HTTP_SESSION_MEMORY_RESERVATION),
                 config.getLong(ExecConstants.HTTP_SESSION_MEMORY_MAXIMUM));
 
+        // Create a dummy close future which is needed by Foreman only. Foreman uses this
future to add a close
+        // listener to known about channel close event from underlying layer. We use this
future to notify Foreman
+        // listeners when the Web session (not connection) between Web Client and WebServer
is closed. This will help
+        // Foreman to cancel all the running queries for this Web Client.
+        final ChannelPromise closeFuture = new DefaultChannelPromise(null, executor);
+
         // Create a WebSessionResource instance which owns the lifecycle of all the session
resources.
-        // Set this instance as an attribute of HttpSession, since it will be used until
session is destroyed.
-        webSessionResources = new WebSessionResources(sessionAllocator, remoteAddress, drillUserSession);
+        // Set this instance as an attribute of HttpSession, since it will be used until
session is destroyed
+        webSessionResources = new WebSessionResources(sessionAllocator, remoteAddress, drillUserSession,
closeFuture);
         session.setAttribute(WebSessionResources.class.getSimpleName(), webSessionResources);
       }
       // Create a new WebUserConnection for the request
@@ -227,6 +246,9 @@ public class DrillRestServer extends ResourceConfig {
     @Inject
     WorkManager workManager;
 
+    @Inject
+    EventExecutor executor;
+
     @SuppressWarnings("resource")
     @Override
     public WebUserConnection provide() {
@@ -260,8 +282,15 @@ public class DrillRestServer extends ResourceConfig {
         logger.trace("Failed to get the remote address of the http session request", ex);
       }
 
-      final WebSessionResources webSessionResources = new WebSessionResources(sessionAllocator,
-              remoteAddress, drillUserSession);
+      // Create a dummy close future which is needed by Foreman only. Foreman uses this future
to add a close
+      // listener to known about channel close event from underlying layer.
+      //
+      // The invocation of this close future is no-op as it will be triggered after query
completion in unsecure case.
+      // But we need this close future as it's expected by Foreman.
+      final ChannelPromise closeFuture = new DefaultChannelPromise(null, executor);
+
+      final WebSessionResources webSessionResources = new WebSessionResources(sessionAllocator,
remoteAddress,
+          drillUserSession, closeFuture);
 
       // Create a AnonWenUserConnection for this request
       return new AnonWebUserConnection(webSessionResources);

http://git-wip-us.apache.org/repos/asf/drill/blob/a26fbec1/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
index aeed51a..2ca457c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
@@ -19,7 +19,6 @@
 package org.apache.drill.exec.server.rest;
 
 import io.netty.channel.ChannelPromise;
-import io.netty.channel.DefaultChannelPromise;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.ChannelClosedException;
@@ -43,11 +42,12 @@ public class WebSessionResources implements AutoCloseable {
 
   private ChannelPromise closeFuture;
 
-  WebSessionResources(BufferAllocator allocator, SocketAddress remoteAddress, UserSession
userSession) {
+  WebSessionResources(BufferAllocator allocator, SocketAddress remoteAddress,
+                      UserSession userSession, ChannelPromise closeFuture) {
     this.allocator = allocator;
     this.remoteAddress = remoteAddress;
     this.webUserSession = userSession;
-    closeFuture = new DefaultChannelPromise(null);
+    this.closeFuture = closeFuture;
   }
 
   public UserSession getSession() {
@@ -68,16 +68,20 @@ public class WebSessionResources implements AutoCloseable {
 
   @Override
   public void close() {
-
     try {
       AutoCloseables.close(webUserSession, allocator);
     } catch (Exception ex) {
       logger.error("Failure while closing the session resources", ex);
     }
 
-    // Set the close future associated with this session.
+    // Notify all the listeners of this closeFuture for failure events so that listeners
can do cleanup related to this
+    // WebSession. This will be called after every query execution by AnonymousWebUserConnection::cleanupSession
and
+    // for authenticated user it is called when session is invalidated.
+    // For authenticated user it will cancel the in-flight queries based on session invalidation.
Whereas for
+    // unauthenticated user it's a no-op since there is no session associated with it. We
don't have mechanism currently
+    // to call this close future upon Http connection close.
     if (closeFuture != null) {
-      closeFuture.setFailure(new ChannelClosedException("Http Session of the user is closed."));
+      closeFuture.setFailure(new ChannelClosedException("Http connection is closed by Web
Client"));
       closeFuture = null;
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/a26fbec1/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
index bcce9eb..f46b5e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
@@ -42,9 +42,14 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * WebUserConnectionWrapper which represents the UserClientConnection for the WebUser submitting
the query. It provides
- * access to the UserSession executing the query. There is no actual physical channel corresponding
to this connection
- * wrapper.
+ * WebUserConnectionWrapper which represents the UserClientConnection between WebServer and
Foreman, for the WebUser
+ * submitting the query. It provides access to the UserSession executing the query. There
is no actual physical
+ * channel corresponding to this connection wrapper.
+ *
+ * It returns a close future with no actual underlying {@link io.netty.channel.Channel} associated
with it but do have an
+ * EventExecutor out of BitServer EventLoopGroup. Since there is no actual connection established
using this class,
+ * hence the close event will never be fired by underlying layer and close future is set
only when the
+ * {@link WebSessionResources} are closed.
  */
 
 public class WebUserConnection extends AbstractDisposableUserClientConnection implements
ConnectionThrottle {

http://git-wip-us.apache.org/repos/asf/drill/blob/a26fbec1/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
new file mode 100644
index 0000000..bb990de
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest;
+
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.TransportCheck;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.junit.Test;
+
+import java.net.SocketAddress;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Validates {@link WebSessionResources} close works as expected w.r.t {@link io.netty.channel.AbstractChannel.CloseFuture}
+ * associated with it.
+ */
+public class WebSessionResourcesTest {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebSessionResourcesTest.class);
+
+  private WebSessionResources webSessionResources;
+
+  private boolean listenerComplete;
+
+  private CountDownLatch latch;
+
+  private EventExecutor executor;
+
+  // A close listener added in close future in one of the test to see if it's invoked correctly.
+  private class TestClosedListener implements GenericFutureListener<Future<Void>>
{
+    @Override
+    public void operationComplete(Future<Void> future) throws Exception {
+      listenerComplete = true;
+      latch.countDown();
+    }
+  }
+
+  /**
+   * Validates {@link WebSessionResources#close()} throws NPE when closefuture passed to
WebSessionResources doesn't
+   * have a valid channel and EventExecutor associated with it.
+   * @throws Exception
+   */
+  @Test
+  public void testChannelPromiseWithNullExecutor() throws Exception {
+    try {
+      ChannelPromise closeFuture = new DefaultChannelPromise(null);
+      webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class),
mock
+          (UserSession.class), closeFuture);
+      webSessionResources.close();
+      fail();
+    } catch (Exception e) {
+      assertTrue(e instanceof NullPointerException);
+      verify(webSessionResources.getAllocator()).close();
+      verify(webSessionResources.getSession()).close();
+    }
+  }
+
+  /**
+   * Validates successful {@link WebSessionResources#close()} with valid CloseFuture and
other parameters.
+   * @throws Exception
+   */
+  @Test
+  public void testChannelPromiseWithValidExecutor() throws Exception {
+    try {
+      EventExecutor mockExecutor = mock(EventExecutor.class);
+      ChannelPromise closeFuture = new DefaultChannelPromise(null, mockExecutor);
+      webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class),
mock
+          (UserSession.class), closeFuture);
+      webSessionResources.close();
+      verify(webSessionResources.getAllocator()).close();
+      verify(webSessionResources.getSession()).close();
+      verify(mockExecutor).inEventLoop();
+      verify(mockExecutor).execute(any(Runnable.class));
+      assertTrue(webSessionResources.getCloseFuture() == null);
+      assertTrue(!listenerComplete);
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  /**
+   * Validates double call to {@link WebSessionResources#close()} doesn't throw any exception.
+   * @throws Exception
+   */
+  @Test
+  public void testDoubleClose() throws Exception {
+    try {
+      ChannelPromise closeFuture = new DefaultChannelPromise(null, mock(EventExecutor.class));
+      webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class),
mock
+          (UserSession.class), closeFuture);
+      webSessionResources.close();
+
+      verify(webSessionResources.getAllocator()).close();
+      verify(webSessionResources.getSession()).close();
+      assertTrue(webSessionResources.getCloseFuture() == null);
+
+      webSessionResources.close();
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  /**
+   * Validates successful {@link WebSessionResources#close()} with valid CloseFuture and
{@link TestClosedListener}
+   * getting invoked which is added to the close future.
+   * @throws Exception
+   */
+  @Test
+  public void testCloseWithListener() throws Exception {
+    try {
+      // Assign latch, executor and closeListener for this test case
+      GenericFutureListener<Future<Void>> closeListener = new TestClosedListener();
+      latch = new CountDownLatch(1);
+      executor = TransportCheck.createEventLoopGroup(1, "Test-Thread").next();
+      ChannelPromise closeFuture = new DefaultChannelPromise(null, executor);
+
+      // create WebSessionResources with above ChannelPromise to notify listener
+      webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class),
+          mock(UserSession.class), closeFuture);
+
+      // Add the Test Listener to close future
+      assertTrue(!listenerComplete);
+      closeFuture.addListener(closeListener);
+
+      // Close the WebSessionResources
+      webSessionResources.close();
+
+      // Verify the states
+      verify(webSessionResources.getAllocator()).close();
+      verify(webSessionResources.getSession()).close();
+      assertTrue(webSessionResources.getCloseFuture() == null);
+
+      // Since listener will be invoked so test should not wait forever
+      latch.await();
+      assertTrue(listenerComplete);
+    } catch (Exception e) {
+      fail();
+    } finally {
+      listenerComplete = false;
+      executor.shutdownGracefully();
+    }
+  }
+}
\ No newline at end of file


Mime
View raw message