tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [2/8] incubator-tinkerpop git commit: Improved consistency of transaction management in gremlin server.
Date Wed, 09 Dec 2015 20:48:57 GMT
Improved consistency of transaction management in gremlin server.

Sessionless requests now have special handling for GraphTraversal return results.  Gremlin
Server now force iterates those results (when they include Mutating steps) to a List.  It
then commits and then iterates that List. For all other results the commit occurs in the same
spot, but doesn't force iterate which allows results to stream as they always have.

Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/dbc7f94c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/dbc7f94c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/dbc7f94c

Branch: refs/heads/TINKERPOP-1033
Commit: dbc7f94c343f5ca8d4e51c0354280b552c231967
Parents: f1b5702
Author: Stephen Mallette <spmva@genoprime.com>
Authored: Tue Dec 8 19:50:18 2015 -0500
Committer: Stephen Mallette <spmva@genoprime.com>
Committed: Tue Dec 8 19:50:18 2015 -0500

 CHANGELOG.asciidoc                              |  1 +
 .../src/reference/gremlin-applications.asciidoc | 29 +++++++++
 .../upgrade/release-3.1.x-incubating.asciidoc   | 17 ++++++
 .../gremlin/groovy/engine/GremlinExecutor.java  |  1 -
 .../server/op/AbstractEvalOpProcessor.java      | 63 +++++++++++++++-----
 .../gremlin/server/op/session/Session.java      |  1 -
 .../server/GremlinDriverIntegrateTest.java      |  4 +-
 7 files changed, 97 insertions(+), 19 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 2bf0df7..589851d 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -27,6 +27,7 @@ TinkerPop 3.1.1 (NOT OFFICIALLY RELEASED YET)
 * Fixed a `NullPointerException` bug in `PeerPressureVertexProgram` that occurred when an
adjacency traversal was not provided.
+* Improved Transaction Management consistency in Gremlin Server.
 * Fixed a long standing issue around having to use `reduceByKey()` on input data to Spark.
It is no longer required.
 * Added `Spark` static object to allow "file system" control of persisted RDDs in Spark.
 * Improved logging control during builds with Maven.

diff --git a/docs/src/reference/gremlin-applications.asciidoc b/docs/src/reference/gremlin-applications.asciidoc
index ea47342..812a1d7 100644
--- a/docs/src/reference/gremlin-applications.asciidoc
+++ b/docs/src/reference/gremlin-applications.asciidoc
@@ -1132,6 +1132,35 @@ of a `Graph` is bound to the thread it was initialized in.
 A session is a "heavier" approach to the simple "request/response" approach of sessionless
requests, but is sometimes
 necessary for a given use case.
+Considering Transactions
+Gremlin Server does automated transaction handling for "sessionless" requests (i.e. no state
between requests). It
+will automatically commit or rollback transactions depending on the success or failure of
the script. When submitting
+requests it is important to recognize that transaction management procedures may differ slightly
depending on what is
+returned from the script.
+* If the script returns anything other than a `GraphTraversal`, a commit will be called just
before results are
+iterated back to the client.
+* If the result is a `GraphTraversal` that has no `Mutating` steps, a commit will be called
just before results are
+iterated back to the client.
+* If the result is a `GraphTraversal` that has one or more `Mutating` steps (i.e. one that
modifies the `Graph`),
+the `GraphTraversal` will be iterated, it's results pushed to a `List`, commit called, and
the result in the `List`
+iterated back to the client.
+The last bullet point above begs additional explanation.  Assume that the script `g.addV('name','stephen')`
+submitted to the server.  That script returns a `GraphTraversal` and has a `Mutating` step.
The traversal needs to
+be iterated in order for the mutations to take place or else the commit will have no effect.
That's why Gremlin
+Server attempts to detect these types of traversals and treat them specially. The unfortunate
downside is that the
+result of this script must be realized in memory which means that they aren't being streamed
back to the client.
+For small results this likely should not present an issue.
+NOTE: It is possible to bypass the transaction management system around `GraphTraversal`
by using a lambda. Gremlin
+Server is only looking for `Mutating` steps, so a script like: `g.V().sideEffect{it.get().property('color','green')}`
+would not be iterated prior to commit and the mutations not realized.  If lambdas must be
used then it is important
+to self-iterate by doing something like: `g.V().sideEffect{it.get().property('color','green')}.toList()`.
 Developing a Driver

diff --git a/docs/src/upgrade/release-3.1.x-incubating.asciidoc b/docs/src/upgrade/release-3.1.x-incubating.asciidoc
index 5cccb94..9b6d205 100644
--- a/docs/src/upgrade/release-3.1.x-incubating.asciidoc
+++ b/docs/src/upgrade/release-3.1.x-incubating.asciidoc
@@ -32,6 +32,23 @@ Please see the link:https://github.com/apache/incubator-tinkerpop/blob/3.1.1-inc
 Upgrading for Users
+Gremlin Server Transaction Management
+There were some changes to how Gremlin Server manages transactions on sessionless requests
to make the process more
+consistent across different graph databases. Commits now occur after evaluation and prior
to result iteration, which
+ensures an earlier failure (i.e. prior to results getting to the client indicating a false
success) and better
+handling of a result that is a `GraphTraversal` that mutates the `Graph`.
+This change likely does not require any changes to the code of users, but does introduce
some items to be aware of
+when issuing scripts. Most specifically, using lambdas in a request that returns a `GraphTraversal`,
designed to modify
+the `Graph`, will fail to do so unless it is self-iterated.  In other words, instead of sending:
+`g.V().sideEffect{it.get().property('color','green')}` one would send:
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-1035[TINKERPOP-1035],
Documentation - Considering Transactions]
 Deprecated credentialsDbLocation

diff --git a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
index 762a60d..f213d0d 100644
--- a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
+++ b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
@@ -54,7 +54,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;

diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
index eaa7d2b..fb415b5 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
@@ -18,26 +18,40 @@
 package org.apache.tinkerpop.gremlin.server.op;
+import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
+import io.netty.channel.ChannelFuture;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.server.handler.StateKey;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.server.Context;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
 import org.apache.tinkerpop.gremlin.server.OpProcessor;
 import org.apache.tinkerpop.gremlin.server.Settings;
 import org.apache.tinkerpop.gremlin.server.util.MetricManager;
+import org.apache.tinkerpop.gremlin.structure.io.Mapper;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
 import org.apache.tinkerpop.gremlin.util.function.ThrowingConsumer;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.commons.lang.time.StopWatch;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.io.ByteBufferOutputStream;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import javax.script.Bindings;
+import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -60,6 +74,7 @@ import static com.codahale.metrics.MetricRegistry.name;
 public abstract class AbstractEvalOpProcessor implements OpProcessor {
     private static final Logger logger = LoggerFactory.getLogger(AbstractEvalOpProcessor.class);
     public static final Timer evalOpTimer = MetricManager.INSTANCE.getTimer(name(GremlinServer.class,
"op", "eval"));
+    static final Meter errorMeter = MetricManager.INSTANCE.getMeter(name(GremlinServer.class,
      * This may or may not be the full set of invalid binding keys.  It is dependent on the
static imports made to
@@ -213,35 +228,60 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor
+            return;
         // timer for the total serialization time
         final StopWatch stopWatch = new StopWatch();
+        // if we manage the transactions then we need to commit stuff now that eval is complete.
+        Iterator toIterate = itty;
+        if (manageTransactions) {
+            // If the eval returned a Traversal then it gets special treatment
+            if (itty instanceof GraphTraversal) {
+                final GraphTraversal traversal = (GraphTraversal) itty;
+                // if it has Mutating steps then it needs to be iterated to produce the mutations
in the transaction.
+                // after it is iterated then we can commit.  of course, this comes at the
expense of being able
+                // to stream results back to the client as the result has to be realized
into memory.
+                //
+                // labmdas are a loophole here.  for now, users will need to self iterate
if they need lambdas :/
+                final boolean hasMutating = traversal.asAdmin().getSteps().stream().anyMatch(s
-> s instanceof Mutating);
+                if (hasMutating) toIterate = IteratorUtils.list(itty).iterator();
+            }
+            // in any case, the commit should occur because at this point a GraphTraversal
has been iterated OR
+            // the script has been executed. failures will bubble up before we start to iterate
results which makes
+            // sense as we wouldn't want to waste time sending back results when the transaction
is going to end up
+            // failing
+            context.getGraphManager().commitAll();
+        }
         // the batch size can be overridden by the request
         final int resultIterationBatchSize = (Integer) msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
         List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
-        while (itty.hasNext()) {
+        while (toIterate.hasNext()) {
             if (Thread.interrupted()) throw new InterruptedException();
             // have to check the aggregate size because it is possible that the channel is
not writeable (below)
             // so iterating next() if the message is not written and flushed would bump the
aggregate size beyond
             // the expected resultIterationBatchSize.  Total serialization time for the response
remains in
             // effect so if the client is "slow" it may simply timeout.
-            if (aggregate.size() < resultIterationBatchSize) aggregate.add(itty.next());
+            if (aggregate.size() < resultIterationBatchSize) aggregate.add(toIterate.next());
             // send back a page of results if batch size is met or if it's the end of the
results being iterated.
             // also check writeability of the channel to prevent OOME for slow clients.
             if (ctx.channel().isWritable()) {
-                if  (aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
-                    final ResponseStatusCode code = itty.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT
: ResponseStatusCode.SUCCESS;
+                if (aggregate.size() == resultIterationBatchSize || !toIterate.hasNext())
+                    final ResponseStatusCode code = toIterate.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT
: ResponseStatusCode.SUCCESS;
-                            .code(code)
-                            .result(aggregate).create());
+                                .code(code)
+                                .result(aggregate).create());
-                    aggregate = new ArrayList<>(resultIterationBatchSize);
+                    // only need to reset the aggregation list if there's more stuff to write
+                    if (toIterate.hasNext()) aggregate = new ArrayList<>(resultIterationBatchSize);
             } else {
                 // don't keep triggering this warning over and over again for the same request
@@ -265,15 +305,6 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor
-        // if there's no more items in the iterator then we've aggregated everything and
are thus ready to
-        // commit stuff if transaction management is on.  exceptions should bubble up and
be handle in the normal
-        // manner of things.  a final SUCCESS message will not have been sent (below) and
we ship back an error.
-        // if transaction management is not enabled, then returning SUCCESS below is OK as
this is a different
-        // usage context.  without transaction management enabled, the user is responsible
for maintaining
-        // the transaction and will want a SUCCESS to know their eval and iteration was ok.
 they would then
-        // potentially have a failure on commit on the next request.
-        if (manageTransactions) context.getGraphManager().commitAll();

diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
index eb140d8..70fa593 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
@@ -19,7 +19,6 @@
 package org.apache.tinkerpop.gremlin.server.op.session;
 import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.server.Context;
 import org.apache.tinkerpop.gremlin.server.GraphManager;
 import org.apache.tinkerpop.gremlin.server.Settings;

diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 673344e..614d161 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -697,12 +697,14 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = Cluster.build().create();
         final Client client = cluster.connect();
-        final Vertex vertexRequest1 = client.submit("graph.addVertex(\"name\",\"stephen\")").all().get().get(0).getVertex();
+        // this line is important because it tests GraphTraversal which has a certain transactional
+        final Vertex vertexRequest1 = client.submit("g.addV(\"name\",\"stephen\")").all().get().get(0).getVertex();
         assertEquals("stephen", vertexRequest1.values("name").next());
         final Vertex vertexRequest2 = client.submit("graph.vertices().next()").all().get().get(0).getVertex();
         assertEquals("stephen", vertexRequest2.values("name").next());
+        // this line is important because it tests the other transactional path
         final Vertex vertexRequest3 = client.submit("graph.addVertex(\"name\",\"marko\")").all().get().get(0).getVertex();
         assertEquals("marko", vertexRequest3.values("name").next());

View raw message