tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [06/32] incubator-tinkerpop git commit: Fixed bug with driver connecting to an initially dead server.
Date Fri, 26 Feb 2016 17:51:35 GMT
Fixed bug with driver connecting to an initially dead server.

The ConnectionPool wasn't getting initialized properly when the host was dead which then lead
to a null pointer exception when trying to reconnect.  Also fixed the possibiilty of a connection
leak if the pool was partially initialized but the host died in the middle of that. Those
partial connections were never properly closed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/37f30339
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/37f30339
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/37f30339

Branch: refs/heads/TINKERPOP-1166
Commit: 37f30339cba07d084ed66a33f6f6e05936d18b2a
Parents: 6f27995
Author: Stephen Mallette <spmva@genoprime.com>
Authored: Mon Feb 22 16:01:44 2016 -0500
Committer: Stephen Mallette <spmva@genoprime.com>
Committed: Mon Feb 22 16:01:44 2016 -0500

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   2 +
 .../gremlin/driver/ConnectionPool.java          |   7 +-
 .../gremlin/driver/LoadBalancingStrategy.java   |  10 +-
 .../server/GremlinDriverIntegrateTest.java      | 114 ++++++++++++-------
 4 files changed, 86 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/37f30339/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index b35efcb..37cee21 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,8 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.1.2 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Fixed a condition where `ConnectionPool` initialization in the driver would present a `NullPointerException`
on initialization if there were errors constructing the pool in full.
+* Fixed a bug in the round-robin load balancing strategy in the driver would waste requests
potentially sending messages to dead hosts.
 * Fixed a bug where multiple "close" requests were being sent by the driver on `Client.close()`.
 * Fixed an `Property` attach bug that shows up in serialization-based `GraphComputer` implementations.
 * Fixed a pom.xml bug where Gremlin Console/Server were not pulling the latest Neo4j 2.3.2.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/37f30339/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 19a29e4..f18d25e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -89,19 +89,18 @@ final class ConnectionPool {
         this.maxSimultaneousUsagePerConnection = settings.maxSimultaneousUsagePerConnection;
         this.minInProcess = settings.minInProcessPerConnection;
 
-        final List<Connection> l = new ArrayList<>(minPoolSize);
+        this.connections = new CopyOnWriteArrayList<>();
 
         try {
             for (int i = 0; i < minPoolSize; i++)
-                l.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+                this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
         } catch (ConnectionException ce) {
             // ok if we don't get it initialized here - when a request is attempted in a
connection from the
             // pool it will try to create new connections as needed.
-            logger.debug("Could not initialize connections in pool for {} - pool size at
{}", host, l.size());
+            logger.debug("Could not initialize connections in pool for {} - pool size at
{}", host, this.connections.size());
             considerUnavailable();
         }
 
-        this.connections = new CopyOnWriteArrayList<>(l);
         this.open = new AtomicInteger(connections.size());
 
         logger.info("Opening connection pool on {} with core size of {}", host, minPoolSize);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/37f30339/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
index e2ced77..b485911 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -61,7 +62,14 @@ public interface LoadBalancingStrategy extends Host.Listener {
 
         @Override
         public Iterator<Host> select(final RequestMessage msg) {
-            final List<Host> hosts = (List<Host>) availableHosts.clone();
+            final List<Host> hosts = new ArrayList<>();
+
+            // a host could be marked as dead in which case we dont need to send messages
to it - just skip it for
+            // now. it might come back online later
+            availableHosts.iterator().forEachRemaining(host -> {
+                if (host.isAvailable()) hosts.add(host);
+            });
+
             final int startIndex = index.getAndIncrement();
 
             if (startIndex > Integer.MAX_VALUE - 10000)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/37f30339/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 125afa9..609055f 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -53,6 +54,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -90,7 +92,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
                 try {
                     final String p = TestHelper.generateTempFileFromResource(
                             GremlinDriverIntegrateTest.class, "generate-shouldRebindTraversalSourceVariables.groovy",
"").getAbsolutePath();
-                    settings.scriptEngines.get("gremlin-groovy").scripts = Arrays.asList(p);
+                    settings.scriptEngines.get("gremlin-groovy").scripts = Collections.singletonList(p);
                 } catch (Exception ex) {
                     throw new RuntimeException(ex);
                 }
@@ -127,6 +129,47 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     }
 
     @Test
+    public void shouldEventuallySucceedOnSameServer() throws Exception {
+        stopServer();
+
+        final Cluster cluster = Cluster.build().addContactPoint("localhost").create();
+        final Client client = cluster.connect();
+
+        try {
+            client.submit("1+1").all().join().get(0).getInt();
+            fail("Should not have gone through because the server is not running");
+        } catch (Exception i) {
+            final Throwable root = ExceptionUtils.getRootCause(i);
+            assertThat(root, instanceOf(TimeoutException.class));
+        }
+
+        startServer();
+
+        // default reconnect time is 1 second so wait some extra time to be sure it has time
to try to bring it
+        // back to life
+        TimeUnit.SECONDS.sleep(3);
+        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldEventuallySucceedWithRoundRobin() throws Exception {
+        final String noGremlinServer = "74.125.225.19";
+        final Cluster cluster = Cluster.build(noGremlinServer).addContactPoint("localhost").create();
+        final Client client = cluster.connect();
+
+        // the first host is dead on init.  request should succeed on localhost
+        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+
+        cluster.close();
+    }
+
+    @Test
     public void shouldHandleResultsOfAllSizes() throws Exception {
         final Cluster cluster = Cluster.open();
         final Client client = cluster.connect();
@@ -393,19 +436,18 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         cluster.close();
     }
 
-    /**
-     * This test arose from this issue: https://github.org/apache/tinkerpop/tinkerpop3/issues/515
-     * <p/>
-     * ResultSet.all returns a CompletableFuture that blocks on the worker pool until isExhausted
returns false.
-     * isExhausted in turn needs a thread on the worker pool to even return. So its totally
possible to consume all
-     * threads on the worker pool waiting for .all to finish such that you can't even get
one to wait for
-     * isExhausted to run.
-     * <p/>
-     * Note that all() doesn't work as described above anymore.  It waits for callback on
readComplete rather
-     * than blocking on isExhausted.
-     */
     @Test
     public void shouldAvoidDeadlockOnCallToResultSetDotAll() throws Exception {
+
+        // This test arose from this issue: https://github.org/apache/tinkerpop/tinkerpop3/issues/515
+        //
+        // ResultSet.all returns a CompletableFuture that blocks on the worker pool until
isExhausted returns false.
+        // isExhausted in turn needs a thread on the worker pool to even return. So its totally
possible to consume all
+        // threads on the worker pool waiting for .all to finish such that you can't even
get one to wait for
+        // isExhausted to run.
+        //
+        // Note that all() doesn't work as described above anymore.  It waits for callback
on readComplete rather
+        // than blocking on isExhausted.
         final int workerPoolSizeForDriver = 2;
 
         // the number of requests 4 times the size of the worker pool as this originally
did produce the problem
@@ -462,27 +504,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     }
 
     @Test
-    public void shouldHandleRequestSentThatNeverReturns() throws Exception {
-        final Cluster cluster = Cluster.open();
-        final Client client = cluster.connect();
-
-        final ResultSet results = client.submit("Thread.sleep(10000); 'should-not-ever-get-back-coz-we-killed-the-server'");
-
-        stopServer();
-
-        try {
-            results.getAvailableItemCount();
-            fail("Server was stopped before the request could execute");
-        } catch (Exception ex) {
-            final Throwable cause = ex.getCause();
-            assertThat(cause, instanceOf(ResponseException.class));
-            assertThat(cause.getMessage(), containsString("rejected from java.util.concurrent.ThreadPoolExecutor"));
-        }
-
-        cluster.close();
-    }
-
-    @Test
     public void shouldFailWithBadServerSideSerialization() throws Exception {
         final Cluster cluster = Cluster.open();
         final Client client = cluster.connect();
@@ -575,17 +596,26 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     }
 
     @Test
-    public void shouldEventuallySucceedWithRoundRobin() throws Exception {
-        final String noGremlinServer = "74.125.225.19";
-        final Cluster cluster = Cluster.build(noGremlinServer).addContactPoint("localhost").create();
+    public void shouldHandleRequestSentThatNeverReturns() throws Exception {
+        final Cluster cluster = Cluster.open();
         final Client client = cluster.connect();
 
-        // the first host is dead on init.  request should succeed on localhost
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+        final ResultSet results = client.submit("Thread.sleep(10000); 'should-not-ever-get-back-coz-we-killed-the-server'");
+
+        stopServer();
+
+        // give the server a chance to kill everything
+        Thread.sleep(1000);
+
+        try {
+            results.all().get(10000, TimeUnit.MILLISECONDS);
+            fail("Server was stopped before the request could execute");
+        } catch (TimeoutException toe) {
+            fail("Should not have tossed a TimeOutException getting the result");
+        } catch (Exception ex) {
+            final Throwable cause = ExceptionUtils.getCause(ex);
+            assertThat(cause.getMessage(), containsString("rejected from java.util.concurrent.ThreadPoolExecutor"));
+        }
 
         cluster.close();
     }


Mime
View raw message