tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From spmalle...@apache.org
Subject [43/50] tinkerpop git commit: TINKERPOP-1490 Restructured Traversal.promise()
Date Fri, 16 Dec 2016 15:55:44 GMT
TINKERPOP-1490 Restructured Traversal.promise()

No longer uses an ExecutorService and is only applicable to "remote" traversals. Moved the
commons-lang dependency back to gremlin-groovy for now.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/ee6a3589
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/ee6a3589
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/ee6a3589

Branch: refs/heads/TINKERPOP-1490
Commit: ee6a35893661b015dbb827463f175ddcecf1bcb8
Parents: eb08976
Author: Stephen Mallette <spmva@genoprime.com>
Authored: Fri Nov 11 12:51:40 2016 -0500
Committer: Stephen Mallette <spmva@genoprime.com>
Committed: Fri Dec 16 10:00:40 2016 -0500

----------------------------------------------------------------------
 gremlin-core/pom.xml                            |   5 -
 .../process/remote/RemoteConnection.java        |  12 +-
 .../remote/traversal/RemoteTraversal.java       |   2 +-
 .../remote/traversal/step/map/RemoteStep.java   |  32 +++-
 .../gremlin/process/traversal/Traversal.java    |  57 ++------
 .../traversal/util/DefaultTraversal.java        |  37 -----
 .../process/traversal/TraversalTest.java        | 145 -------------------
 .../tinkerpop/gremlin/driver/Connection.java    |   6 +-
 .../driver/remote/DriverRemoteConnection.java   |  14 ++
 .../driver/remote/DriverRemoteTraversal.java    |  16 +-
 .../DriverRemoteTraversalSideEffects.java       |  22 ++-
 .../DriverRemoteTraversalSideEffectsTest.java   |  12 +-
 gremlin-groovy/pom.xml                          |   5 +
 .../server/GremlinServerIntegrateTest.java      |  25 ++++
 .../process/traversal/CoreTraversalTest.java    |  42 ------
 15 files changed, 131 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/pom.xml
----------------------------------------------------------------------
diff --git a/gremlin-core/pom.xml b/gremlin-core/pom.xml
index 0594448..e8f3a34 100644
--- a/gremlin-core/pom.xml
+++ b/gremlin-core/pom.xml
@@ -61,11 +61,6 @@ limitations under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-            <version>3.3.1</version>
-        </dependency>
         <!-- LOGGING -->
         <dependency>
             <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
index 8506ad7..f4e3976 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * A simple abstraction of a "connection" to a "server" that is capable of processing a {@link
Traversal} and
@@ -43,9 +44,16 @@ public interface RemoteConnection extends AutoCloseable {
     public <E> Iterator<Traverser.Admin<E>> submit(final Traversal<?,
E> traversal) throws RemoteConnectionException;
 
     /**
-     * Submits {@link Traversal} {@link Bytecode} to a server and returns a {@link Traversal}.
-     * The {@link Traversal} is an abstraction over two types of results that can be returned
as part of the
+     * Submits {@link Traversal} {@link Bytecode} to a server and returns a {@link RemoteTraversal}.
+     * The {@link RemoteTraversal} is an abstraction over two types of results that can be
returned as part of the
      * response from the server: the results of the {@link Traversal} itself and the side-effects
that it produced.
      */
     public <E> RemoteTraversal<?,E> submit(final Bytecode bytecode) throws RemoteConnectionException;
+
+    /**
+     * Submits {@link Traversal} {@link Bytecode} to a server and returns a promise of a
{@link RemoteTraversal}.
+     * The {@link RemoteTraversal} is an abstraction over two types of results that can be
returned as part of the
+     * response from the server: the results of the {@link Traversal} itself and the side-effects
that it produced.
+     */
+    public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final
Bytecode bytecode) throws RemoteConnectionException;
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
index 9c893c2..57b0cda 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
@@ -39,7 +39,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
 public interface RemoteTraversal<S,E> extends Traversal.Admin<S,E> {
 
     /**
-     * Returns remote side-effects generated by the traversal so that they can accessible
to the client. Note that
+     * Returns remote side-effects generated by the traversal so that they can be accessible
to the client. Note that
      * "side-effect" refers to the value in "a" in the traversal {@code g.V().aggregate('a').values('name')}.
      */
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
index 6b2be96..3e19097 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
@@ -21,12 +21,17 @@ package org.apache.tinkerpop.gremlin.process.remote.traversal.step.map;
 import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
 import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException;
 import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Sends a {@link Traversal} to a {@link RemoteConnection} and iterates back the results.
@@ -38,6 +43,7 @@ public final class RemoteStep<S, E> extends AbstractStep<S, E>
{
 
     private transient RemoteConnection remoteConnection;
     private RemoteTraversal<?, E> remoteTraversal;
+    private final AtomicReference<CompletableFuture<Traversal<?, E>>> traversalFuture
= new AtomicReference<>(null);
 
     public RemoteStep(final Traversal.Admin traversal, final RemoteConnection remoteConnection)
{
         super(traversal);
@@ -51,14 +57,26 @@ public final class RemoteStep<S, E> extends AbstractStep<S, E>
{
 
     @Override
     protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
-        if (null == this.remoteTraversal) {
-            try {
-                this.remoteTraversal = this.remoteConnection.submit(this.traversal.getBytecode());
-                this.traversal.setSideEffects(this.remoteTraversal.getSideEffects());
-            } catch (final RemoteConnectionException sce) {
-                throw new IllegalStateException(sce);
+        if (null == this.remoteTraversal) promise().join();
+        return this.remoteTraversal.nextTraverser();
+    }
+
+    /**
+     * Submits the traversal asynchronously to a "remote" using {@link RemoteConnection#submitAsync(Bytecode)}.
+     */
+    public CompletableFuture<Traversal<?, E>> promise() {
+        try {
+            if (null == traversalFuture.get()) {
+                traversalFuture.set(this.remoteConnection.submitAsync(this.traversal.getBytecode()).<Traversal<?,
E>>thenApply(t -> {
+                    this.remoteTraversal = (RemoteTraversal<?, E>) t;
+                    this.traversal.setSideEffects(this.remoteTraversal.getSideEffects());
+                    return traversal;
+                }));
             }
+
+            return traversalFuture.get();
+        } catch (RemoteConnectionException rce) {
+            throw new IllegalStateException(rce);
         }
-        return this.remoteTraversal.nextTraverser();
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java
index e4ba5a6..04f5127 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java
@@ -18,8 +18,9 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal;
 
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.step.map.RemoteStep;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.ProfileSideEffectStep;
@@ -43,11 +44,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.Spliterator;
 import java.util.Spliterators;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Stream;
@@ -148,39 +145,21 @@ public interface Traversal<S, E> extends Iterator<E>, Serializable,
Cloneable, A
 
     /**
      * Starts a promise to execute a function on the current {@code Traversal} that will
be completed in the future.
-     * This implementation uses {@link Admin#traversalExecutorService} to execute the supplied
-     * {@code traversalFunction}.
+     * Note that this method can only be used if the {@code Traversal} is constructed using
+     * {@link TraversalSource#withRemote(Configuration)}. Calling this method otherwise will
yield an
+     * {@code IllegalStateException}.
      */
     public default <T> CompletableFuture<T> promise(final Function<Traversal,
T> traversalFunction) {
-        return promise(traversalFunction, Admin.traversalExecutorService);
-    }
-
-    /**
-     * Starts a promise to execute a function on the current {@code Traversal} that will
be completed in the future.
-     * This implementation uses the caller supplied {@code ExecutorService} to execute the
{@code traversalFunction}.
-     */
-    public default <T> CompletableFuture<T> promise(final Function<Traversal,
T> traversalFunction, final ExecutorService service) {
-        final CompletableFuture<T> promise = new CompletableFuture<>();
-        final Future iterationFuture = service.submit(() -> {
-            try {
-                promise.complete(traversalFunction.apply(this));
-            } catch (Exception ex) {
-                // the promise may have been cancelled by the caller, in which case, there
is no need to attempt
-                // another write on completion
-                if (!promise.isDone()) promise.completeExceptionally(ex);
-            }
-        });
-
-        // if the user cancels the promise then attempt to kill the iteration.
-        promise.exceptionally(t -> {
-            if (t instanceof CancellationException) {
-                iterationFuture.cancel(true);
-            }
-
-            return null;
-        });
-
-        return promise;
+        // apply strategies to see if RemoteStrategy has any effect (i.e. add RemoteStep)
+        if (!this.asAdmin().isLocked()) this.asAdmin().applyStrategies();
+
+        // use the end step so the results are bulked
+        final Step<?, E> endStep = this.asAdmin().getEndStep();
+        if (endStep instanceof RemoteStep) {
+            return ((RemoteStep) endStep).promise().thenApply(traversalFunction);
+        } else {
+            throw new IllegalStateException("Only traversals created using withRemote() can
be used in an async way");
+        }
     }
 
     /**
@@ -297,12 +276,6 @@ public interface Traversal<S, E> extends Iterator<E>, Serializable,
Cloneable, A
     public interface Admin<S, E> extends Traversal<S, E> {
 
         /**
-         * Service that handles promises.
-         */
-        static final ExecutorService traversalExecutorService = Executors.newCachedThreadPool(
-                new BasicThreadFactory.Builder().namingPattern("traversal-executor-%d").build());
-
-        /**
          * Get the {@link Bytecode} associated with the construction of this traversal.
          *
          * @return the byte code representation of the traversal

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
index 6ce6dfe..3c21e37 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
@@ -43,9 +43,6 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Function;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -328,40 +325,6 @@ public class DefaultTraversal<S, E> implements Traversal.Admin<S,
E> {
         this.graph = graph;
     }
 
-    /**
-     * Override of {@link Traversal#promise(Function)} that is aware of graph transactions.
-     */
-    @Override
-    public <T2> CompletableFuture<T2> promise(final Function<Traversal, T2>
traversalFunction) {
-        return this.promise(traversalFunction, Traversal.Admin.traversalExecutorService);
-    }
-
-    /**
-     * Override of {@link Traversal#promise(Function)} that is aware of graph transactions.
In a transactional graph
-     * a promise represents the full scope of a transaction, even if the graph is only partially
iterated.
-     */
-    @Override
-    public <T2> CompletableFuture<T2> promise(final Function<Traversal, T2>
traversalFunction, final ExecutorService service) {
-        if (graph != null && graph.features().graph().supportsTransactions()) {
-            final Function<Traversal, T2> transactionAware = traversal -> {
-
-                try {
-                    if (graph.tx().isOpen()) graph.tx().rollback();
-                    final T2 obj = traversalFunction.apply(traversal);
-                    if (graph.tx().isOpen()) graph.tx().commit();
-                    return obj;
-                } catch (Exception ex) {
-                    if (graph.tx().isOpen()) graph.tx().rollback();
-                    throw ex;
-                }
-            };
-
-            return Traversal.Admin.super.promise(transactionAware, service);
-        } else {
-            return Traversal.Admin.super.promise(traversalFunction, service);
-        }
-    }
-
     @Override
     public boolean equals(final Object other) {
         return other != null && other.getClass().equals(this.getClass()) &&
this.equals(((Traversal.Admin) other));

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java
b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java
index aa1b99b..c427d8e 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java
@@ -30,34 +30,22 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Optional;
-import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsCollectionContaining.hasItems;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 public class TraversalTest {
 
-    private final ExecutorService service = Executors.newFixedThreadPool(2);
-
     @Test
     public void shouldTryNext() {
         final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3);
@@ -117,139 +105,6 @@ public class TraversalTest {
     }
 
     @Test
-    public void shouldPromiseNextThreeUsingForkJoin() throws Exception {
-        final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3, 4, 5, 6,
7);
-        final CompletableFuture<List<Integer>> promiseFirst = t.promise(traversal
-> traversal.next(3));
-        final List<Integer> listFirst = promiseFirst.get();
-        assertEquals(3, listFirst.size());
-        assertThat(listFirst, hasItems(1 ,2, 3));
-        assertThat(t.hasNext(), is(true));
-        assertThat(promiseFirst.isDone(), is(true));
-
-        final CompletableFuture<List<Integer>> promiseSecond = t.promise(traversal
-> traversal.next(3));
-        final List<Integer> listSecond = promiseSecond.get();
-        assertEquals(3, listSecond.size());
-        assertThat(listSecond, hasItems(4, 5, 6));
-        assertThat(t.hasNext(), is(true));
-        assertThat(promiseSecond.isDone(), is(true));
-
-        final CompletableFuture<List<Integer>> promiseThird = t.promise(traversal
-> traversal.next(3));
-        final List<Integer> listThird = promiseThird.get();
-        assertEquals(1, listThird.size());
-        assertThat(listThird, hasItems(7));
-        assertThat(t.hasNext(), is(false));
-        assertThat(promiseThird.isDone(), is(true));
-
-        final CompletableFuture<Integer> promiseDead = t.promise(traversal -> (Integer)
traversal.next());
-        final AtomicBoolean dead = new AtomicBoolean(false);
-        promiseDead.exceptionally(tossed -> {
-            dead.set(tossed instanceof NoSuchElementException);
-            return null;
-        });
-
-        try {
-            promiseDead.get(10000, TimeUnit.MILLISECONDS);
-            fail("Should have gotten an exception");
-        } catch (Exception ex) {
-            if (ex instanceof TimeoutException) {
-                fail("This should not have timed out but should have gotten an exception
caught above in the exceptionally() clause");
-            }
-
-            assertThat(ex.getCause(), instanceOf(NoSuchElementException.class));
-        }
-
-        assertThat(dead.get(), is(true));
-        assertThat(t.hasNext(), is(false));
-        assertThat(promiseDead.isDone(), is(true));
-    }
-
-    @Test
-    public void shouldPromiseNextThreeUsingSpecificExecutor() throws Exception {
-        final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3, 4, 5, 6,
7);
-        final CompletableFuture<List<Integer>> promiseFirst = t.promise(traversal
-> traversal.next(3), service);
-        final List<Integer> listFirst = promiseFirst.get();
-        assertEquals(3, listFirst.size());
-        assertThat(listFirst, hasItems(1 ,2, 3));
-        assertThat(t.hasNext(), is(true));
-        assertThat(promiseFirst.isDone(), is(true));
-
-        final CompletableFuture<List<Integer>> promiseSecond = t.promise(traversal
-> traversal.next(3), service);
-        final List<Integer> listSecond = promiseSecond.get();
-        assertEquals(3, listSecond.size());
-        assertThat(listSecond, hasItems(4, 5, 6));
-        assertThat(t.hasNext(), is(true));
-        assertThat(promiseSecond.isDone(), is(true));
-
-        final CompletableFuture<List<Integer>> promiseThird = t.promise(traversal
-> traversal.next(3), service);
-        final List<Integer> listThird = promiseThird.get();
-        assertEquals(1, listThird.size());
-        assertThat(listThird, hasItems(7));
-        assertThat(t.hasNext(), is(false));
-        assertThat(promiseThird.isDone(), is(true));
-
-        final CompletableFuture<Integer> promiseDead = t.promise(traversal -> (Integer)
traversal.next(), service);
-        final AtomicBoolean dead = new AtomicBoolean(false);
-        promiseDead.exceptionally(tossed -> {
-            dead.set(tossed instanceof NoSuchElementException);
-            return null;
-        });
-
-        try {
-            promiseDead.get(10000, TimeUnit.MILLISECONDS);
-            fail("Should have gotten an exception");
-        } catch (Exception ex) {
-            if (ex instanceof TimeoutException) {
-                fail("This should not have timed out but should have gotten an exception
caught above in the exceptionally() clause");
-            }
-
-            assertThat(ex.getCause(), instanceOf(NoSuchElementException.class));
-        }
-
-        assertThat(dead.get(), is(true));
-        assertThat(t.hasNext(), is(false));
-        assertThat(promiseDead.isDone(), is(true));
-    }
-
-    @Test
-    public void shouldInterruptTraversalFunction() throws Exception {
-        final Random rand = new Random(1234567890);
-
-        // infinite traversal
-        final MockTraversal<Integer> t = new MockTraversal<>(IntStream.generate(rand::nextInt).iterator());
-
-        // iterate a bunch of it
-        final CompletableFuture<List<Integer>> promise10 = t.promise(traversal
-> traversal.next(10), service);
-        assertEquals(10, promise10.get(10000, TimeUnit.MILLISECONDS).size());
-        final CompletableFuture<List<Integer>> promise100 = t.promise(traversal
-> traversal.next(100), service);
-        assertEquals(100, promise100.get(10000, TimeUnit.MILLISECONDS).size());
-        final CompletableFuture<List<Integer>> promise1000 = t.promise(traversal
-> traversal.next(1000), service);
-        assertEquals(1000, promise1000.get(10000, TimeUnit.MILLISECONDS).size());
-
-        // this is endless, so let's cancel
-        final CompletableFuture<List<Integer>> promiseForevers = t.promise(traversal
-> traversal.next(Integer.MAX_VALUE), service);
-
-        // specify what to do on exception
-        final AtomicBoolean failed = new AtomicBoolean(false);
-        promiseForevers.exceptionally(ex -> {
-            failed.set(true);
-            return null;
-        });
-
-        try {
-            // let it actually iterate a moment
-            promiseForevers.get(500, TimeUnit.MILLISECONDS);
-            fail("This should have timed out because the traversal has infinite items in
it");
-        } catch (TimeoutException tex) {
-
-        }
-
-        assertThat(promiseForevers.isDone(), is(false));
-        promiseForevers.cancel(true);
-        assertThat(failed.get(), is(true));
-        assertThat(promiseForevers.isDone(), is(true));
-    }
-
-    @Test
     public void shouldIterate() {
         final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3, 4, 5, 6,
7);
         assertThat(t.hasNext(), is(true));

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 972e838..9a2180e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -208,7 +208,7 @@ final class Connection {
                             logger.debug(String.format("Write on connection %s failed", thisConnection.getConnectionInfo()),
f.cause());
                         thisConnection.isDead = true;
                         thisConnection.returnToPool();
-                        future.completeExceptionally(f.cause());
+                        cluster.executor().submit(() -> future.completeExceptionally(f.cause()));
                     } else {
                         final LinkedBlockingQueue<Result> resultLinkedBlockingQueue
= new LinkedBlockingQueue<>();
                         final CompletableFuture<Void> readCompleted = new CompletableFuture<>();
@@ -250,8 +250,8 @@ final class Connection {
 
                         final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue,
readCompleted);
                         pending.put(requestMessage.getRequestId(), handler);
-                        future.complete(new ResultSet(handler, cluster.executor(), readCompleted,
-                                requestMessage, pool.host));
+                        cluster.executor().submit(() -> future.complete(
+                                new ResultSet(handler, cluster.executor(), readCompleted,
requestMessage, pool.host)));
                     }
                 });
         channel.writeAndFlush(requestMessage, requestPromise);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
index bb2d33d..be3fa28 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
 /**
@@ -163,6 +164,10 @@ public class DriverRemoteConnection implements RemoteConnection {
         }
     }
 
+    /**
+     * @deprecated As of release 3.2.2, replaced by {@link #submit(Bytecode)}.
+     */
+    @Deprecated
     @Override
     public <E> Iterator<Traverser.Admin<E>> submit(final Traversal<?,
E> t) throws RemoteConnectionException {
         try {
@@ -189,6 +194,15 @@ public class DriverRemoteConnection implements RemoteConnection {
     }
 
     @Override
+    public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final
Bytecode bytecode) throws RemoteConnectionException {
+        try {
+            return client.submitAsync(bytecode).thenApply(rs -> new DriverRemoteTraversal<>(rs,
client, attachElements, conf));
+        } catch (Exception ex) {
+            throw new RemoteConnectionException(ex);
+        }
+    }
+
+    @Override
     public void close() throws Exception {
         try {
             client.close();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
index 88ee794..d3f290c 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
@@ -66,28 +66,18 @@ public class DriverRemoteTraversal<S, E> extends AbstractRemoteTraversal<S,
E> {
         }
 
         this.rs = rs;
-        this.sideEffects = new DriverRemoteTraversalSideEffects(
-                client,
+        this.sideEffects = new DriverRemoteTraversalSideEffects(client,
                 rs.getOriginalRequestMessage().getRequestId(),
-                rs.getHost());
+                rs.getHost(), rs.allItemsAvailableAsync());
     }
 
     /**
      * Gets a side-effect from the server. Do not call this method prior to completing the
iteration of the
-     * {@link DriverRemoteTraversal} that spawned this as the side-effect will not be ready.
If this method is called
-     * prior to iteration being complete, then it will block until the traversal notifies
it of completion. Generally
+     * {@link DriverRemoteTraversal} that spawned this as the side-effect will not be ready.
Generally
      * speaking, the common user would not get side-effects this way - they would use a call
to {@code cap()}.
      */
     @Override
     public RemoteTraversalSideEffects getSideEffects() {
-        // wait for the read to complete (i.e. iteration on the server) before allowing the
caller to get the
-        // side-effect. calling prior to this will result in the side-effect not being found.
of course, the
-        // bad part here is that the method blocks indefinitely waiting for the result, but
it prevents the
-        // test failure problems that happen on slower systems. in practice, it's unlikely
that a user would
-        // try to get a side-effect prior to iteration, but since the API allows it, this
at least prevents
-        // the error.
-        rs.allItemsAvailableAsync().join();
-
         return this.sideEffects;
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
index 8d6fa98..4305567 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
@@ -33,6 +33,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Java driver implementation of {@link TraversalSideEffects}. This class is not thread safe.
@@ -50,15 +51,26 @@ public class DriverRemoteTraversalSideEffects extends AbstractRemoteTraversalSid
 
     private boolean closed = false;
     private boolean retrievedAllKeys = false;
+    private final CompletableFuture<Void> ready;
 
-    public DriverRemoteTraversalSideEffects(final Client client, final UUID serverSideEffect,
final Host host) {
+    public DriverRemoteTraversalSideEffects(final Client client, final UUID serverSideEffect,
final Host host,
+                                            final CompletableFuture<Void> ready) {
         this.client = client;
         this.serverSideEffect = serverSideEffect;
         this.host = host;
+        this.ready = ready;
     }
 
     @Override
     public <V> V get(final String key) throws IllegalArgumentException {
+        // wait for the read to complete (i.e. iteration on the server) before allowing the
caller to get the
+        // side-effect. calling prior to this will result in the side-effect not being found.
of course, the
+        // bad part here is that the method blocks indefinitely waiting for the result, but
it prevents the
+        // test failure problems that happen on slower systems. in practice, it's unlikely
that a user would
+        // try to get a side-effect prior to iteration, but since the API allows it, this
at least prevents
+        // the error.
+        ready.join();
+
         if (!keys().contains(key)) throw TraversalSideEffects.Exceptions.sideEffectKeyDoesNotExist(key);
 
         if (!sideEffects.containsKey(key)) {
@@ -91,6 +103,14 @@ public class DriverRemoteTraversalSideEffects extends AbstractRemoteTraversalSid
 
     @Override
     public Set<String> keys() {
+        // wait for the read to complete (i.e. iteration on the server) before allowing the
caller to get the
+        // side-effect. calling prior to this will result in the side-effect not being found.
of course, the
+        // bad part here is that the method blocks indefinitely waiting for the result, but
it prevents the
+        // test failure problems that happen on slower systems. in practice, it's unlikely
that a user would
+        // try to get a side-effect prior to iteration, but since the API allows it, this
at least prevents
+        // the error.
+        ready.join();
+
         if (closed && !retrievedAllKeys) throw new IllegalStateException("Traversal
has been closed - side-effect keys cannot be retrieved");
 
         if (!retrievedAllKeys) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java
index 27d0079..4e6df93 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java
@@ -52,7 +52,9 @@ public class DriverRemoteTraversalSideEffectsTest extends AbstractResultQueueTes
         mockClientForCall(client);
 
         final UUID sideEffectKey = UUID.fromString("31dec2c6-b214-4a6f-a68b-996608dce0d9");
-        final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client,
sideEffectKey, null);
+        final CompletableFuture<Void> ready = new CompletableFuture<>();
+        ready.complete(null);
+        final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client,
sideEffectKey, null, ready);
 
         assertEquals(1, sideEffects.keys().size());
         sideEffects.close();
@@ -73,7 +75,9 @@ public class DriverRemoteTraversalSideEffectsTest extends AbstractResultQueueTes
         mockClientForCall(client);
         mockClientForCall(client);
         final UUID sideEffectKey = UUID.fromString("31dec2c6-b214-4a6f-a68b-996608dce0d9");
-        final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client,
sideEffectKey, null);
+        final CompletableFuture<Void> ready = new CompletableFuture<>();
+        ready.complete(null);
+        final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client,
sideEffectKey, null, ready);
 
         assertNotNull(sideEffects.get("test-0"));
         sideEffects.close();
@@ -93,7 +97,9 @@ public class DriverRemoteTraversalSideEffectsTest extends AbstractResultQueueTes
         mockClientForCall(client);
 
         final UUID sideEffectKey = UUID.fromString("31dec2c6-b214-4a6f-a68b-996608dce0d9");
-        final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client,
sideEffectKey, null);
+        final CompletableFuture<Void> ready = new CompletableFuture<>();
+        ready.complete(null);
+        final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client,
sideEffectKey, null, ready);
 
         sideEffects.close();
         sideEffects.close();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-groovy/pom.xml
----------------------------------------------------------------------
diff --git a/gremlin-groovy/pom.xml b/gremlin-groovy/pom.xml
index dae5e8a..b82c986 100644
--- a/gremlin-groovy/pom.xml
+++ b/gremlin-groovy/pom.xml
@@ -65,6 +65,11 @@ limitations under the License.
             <artifactId>jBCrypt</artifactId>
             <version>jbcrypt-0.4</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.3.1</version>
+        </dependency>
         <!-- TEST -->
         <dependency>
             <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index b3dbe29..420bd05 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -46,12 +46,14 @@ import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.InterpreterModeCust
 import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.SimpleSandboxExtension;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.TimedInterruptCustomizerProvider;
 import org.apache.tinkerpop.gremlin.process.remote.RemoteGraph;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.server.channel.NioChannelizer;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
 import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
@@ -76,6 +78,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -990,4 +993,26 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
         final BulkSet localBSideEffects = se.get("b");
         assertThat(localBSideEffects.isEmpty(), is(false));
     }
+
+    @Test
+    public void shouldDoNonBlockingPromiseWithRemote() throws Exception {
+        final Graph graph = EmptyGraph.instance();
+        final GraphTraversalSource g = graph.traversal().withRemote(conf);
+        g.addV("person").property("age", 20).promise(Traversal::iterate).join();
+        g.addV("person").property("age", 10).promise(Traversal::iterate).join();
+        assertEquals(50L, g.V().hasLabel("person").map(Lambda.function("it.get().value('age')
+ 10")).sum().promise(t -> t.next()).join());
+        g.addV("person").property("age", 20).promise(Traversal::iterate).join();
+
+        final Traversal traversal = g.V().hasLabel("person").has("age", 20).values("age");
+        assertEquals(20, traversal.promise(t -> ((Traversal) t).next(1).get(0)).join());
+        assertEquals(20, traversal.next());
+        assertThat(traversal.hasNext(), is(false));
+
+        final Traversal traversalCloned = g.V().hasLabel("person").has("age", 20).values("age");
+        assertEquals(20, traversalCloned.next());
+        assertEquals(20, traversalCloned.promise(t -> ((Traversal) t).next(1).get(0)).join());
+        assertThat(traversalCloned.promise(t -> ((Traversal) t).hasNext()).join(), is(false));
+
+        assertEquals(3, g.V().promise(Traversal::toList).join().size());
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
index 050f9de..68f8217 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
@@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.process.traversal;
 
 import org.apache.tinkerpop.gremlin.ExceptionCoverage;
 import org.apache.tinkerpop.gremlin.FeatureRequirement;
-import org.apache.tinkerpop.gremlin.FeatureRequirementSet;
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
@@ -41,9 +40,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
@@ -311,42 +307,4 @@ public class CoreTraversalTest extends AbstractGremlinProcessTest {
         }
 
     }
-
-    @Test
-    @FeatureRequirementSet(FeatureRequirementSet.Package.SIMPLE)
-    public void shouldUsePromiseAndControlTransactionsIfAvailable() throws Exception {
-        // this test will validate that transactional graphs can properly open/close transactions
within a promise.
-        // as there is a feature check, non-transactional graphs can use this to simply exercise
the promise API
-        final Vertex vAdded = g.addV("person").property("name", "stephen").promise(t ->
(Vertex) t.next()).get(10000, TimeUnit.MILLISECONDS);
-        final Vertex vRead = g.V().has("name", "stephen").next();
-        assertEquals(vAdded.id(), vRead.id());
-
-        // transaction should have been committed at this point so test the count in this
thread to validate persistence
-        assertVertexEdgeCounts(graph, 1, 0);
-
-        // cancel a promise and ensure the transaction ended in failure. hold the traversal
in park until it can be
-        // interrupted, then the promise will have to rollback the transaction.
-        final CompletableFuture promiseToCancel = g.addV("person").property("name", "marko").sideEffect(traverser
-> {
-            try {
-                Thread.sleep(100000);
-            } catch (Exception ignored) {
-
-            }
-        }).promise(t -> (Vertex) t.next());
-
-        try {
-            promiseToCancel.get(500, TimeUnit.MILLISECONDS);
-            fail("Should have timed out");
-        } catch (TimeoutException te) {
-
-        }
-
-        promiseToCancel.cancel(true);
-
-        // graphs that support transactions will rollback the transaction
-        if (graph.features().graph().supportsTransactions())
-            assertVertexEdgeCounts(graph, 1, 0);
-        else
-            assertVertexEdgeCounts(graph, 2, 0);
-    }
 }


Mime
View raw message