tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [15/50] tinkerpop git commit: TINKERPOP-1511 Fixed problem in TraversalOpProcessor
Date Mon, 24 Oct 2016 17:32:58 GMT
TINKERPOP-1511 Fixed problem in TraversalOpProcessor

TraversalOpProcessor was sending back the final message before the tx commit was happening.
CTR


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/150a53d8
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/150a53d8
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/150a53d8

Branch: refs/heads/TINKERPOP-1389
Commit: 150a53d8a0ba2c2276f2dec49428f1ee14e67d70
Parents: efcff16
Author: Stephen Mallette <spmva@genoprime.com>
Authored: Mon Oct 17 13:53:32 2016 -0400
Committer: Stephen Mallette <spmva@genoprime.com>
Committed: Mon Oct 17 13:57:14 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   1 +
 .../op/traversal/TraversalOpProcessor.java      | 146 ++++++++++++++++++-
 2 files changed, 144 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/150a53d8/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 67506e0..5b4c578 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -27,6 +27,7 @@ TinkerPop 3.2.3 (Release Date: NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 * Restructured Gremlin-Python's GraphSON I/O package to make it easier for users to register
serializers/deserializers. (*breaking*)
+* Fixed a bug with `TraversalOpProcessor` that was returning a final result prior to committing
the transaction.
 * Fixed a bug in `ConnectiveStrategy` where infix and/or was not correctly reasoning on `choose()`
`HasNextStep` injections.
 * Increased performance of `CredentialGraph` authentication.
 * Removed Java 8 stream usage from `TraversalHelper` for performance reasons.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/150a53d8/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
index 88d8d90..4b559a3 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
@@ -22,6 +22,8 @@ import com.codahale.metrics.Timer;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
@@ -38,6 +40,8 @@ import org.apache.tinkerpop.gremlin.server.GraphManager;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
 import org.apache.tinkerpop.gremlin.server.OpProcessor;
 import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.handler.Frame;
+import org.apache.tinkerpop.gremlin.server.handler.StateKey;
 import org.apache.tinkerpop.gremlin.server.op.AbstractOpProcessor;
 import org.apache.tinkerpop.gremlin.server.op.OpProcessorException;
 import org.apache.tinkerpop.gremlin.server.util.MetricManager;
@@ -52,9 +56,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.script.SimpleBindings;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
@@ -369,7 +375,7 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
                     try {
                         // compile the traversal - without it getEndStep() has nothing in
it
                         traversal.applyStrategies();
-                        handleIterator(context, new TraverserIterator(traversal));
+                        handleIterator(context, new TraverserIterator(traversal), graph);
                     } catch (TimeoutException ex) {
                         final String errorMessage = String.format("Response iteration exceeded
the configured threshold for request [%s] - %s", msg.getRequestId(), ex.getMessage());
                         logger.warn(errorMessage);
@@ -382,8 +388,6 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
                         onError(graph, context);
                         return;
                     }
-
-                    onTraversalSuccess(graph, context);
                 } catch (Exception ex) {
                     logger.warn(String.format("Exception processing a Traversal on request
[%s].", msg.getRequestId()), ex);
                     ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
@@ -444,4 +448,140 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
 
         return metaData;
     }
+
+    protected void handleIterator(final Context context, final Iterator itty, final Graph
graph) throws TimeoutException, InterruptedException {
+        final ChannelHandlerContext ctx = context.getChannelHandlerContext();
+        final RequestMessage msg = context.getRequestMessage();
+        final Settings settings = context.getSettings();
+        final MessageSerializer serializer = ctx.channel().attr(StateKey.SERIALIZER).get();
+        final boolean useBinary = ctx.channel().attr(StateKey.USE_BINARY).get();
+        boolean warnOnce = false;
+
+
+        // we have an empty iterator - happens on stuff like: g.V().iterate()
+        if (!itty.hasNext()) {
+            // as there is nothing left to iterate if we are transaction managed then we
should execute a
+            // commit here before we send back a NO_CONTENT which implies success
+            onTraversalSuccess(graph, context);
+            ctx.writeAndFlush(ResponseMessage.build(msg)
+                    .code(ResponseStatusCode.NO_CONTENT)
+                    .create());
+            return;
+        }
+
+        // timer for the total serialization time
+        final StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+
+        // the batch size can be overridden by the request
+        final int resultIterationBatchSize = (Integer) msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
+                .orElse(settings.resultIterationBatchSize);
+        List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
+
+        // use an external control to manage the loop as opposed to just checking hasNext()
in the while.  this
+        // prevent situations where auto transactions create a new transaction after calls
to commit() withing
+        // the loop on calls to hasNext().
+        boolean hasMore = itty.hasNext();
+
+        while (hasMore) {
+            if (Thread.interrupted()) throw new InterruptedException();
+
+            // check if an implementation needs to force flush the aggregated results before
the iteration batch
+            // size is reached.
+            final boolean forceFlush = isForceFlushed(ctx, msg, itty);
+
+            // have to check the aggregate size because it is possible that the channel is
not writeable (below)
+            // so iterating next() if the message is not written and flushed would bump the
aggregate size beyond
+            // the expected resultIterationBatchSize.  Total serialization time for the response
remains in
+            // effect so if the client is "slow" it may simply timeout.
+            //
+            // there is a need to check hasNext() on the iterator because if the channel
is not writeable the
+            // previous pass through the while loop will have next()'d the iterator and if
it is "done" then a
+            // NoSuchElementException will raise its head. also need a check to ensure that
this iteration doesn't
+            // require a forced flush which can be forced by sub-classes.
+            //
+            // this could be placed inside the isWriteable() portion of the if-then below
but it seems better to
+            // allow iteration to continue into a batch if that is possible rather than just
doing nothing at all
+            // while waiting for the client to catch up
+            if (aggregate.size() < resultIterationBatchSize && itty.hasNext()
&& !forceFlush) aggregate.add(itty.next());
+
+            // send back a page of results if batch size is met or if it's the end of the
results being iterated.
+            // also check writeability of the channel to prevent OOME for slow clients.
+            if (ctx.channel().isWritable()) {
+                if (forceFlush || aggregate.size() == resultIterationBatchSize || !itty.hasNext())
{
+                    final ResponseStatusCode code = itty.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT
: ResponseStatusCode.SUCCESS;
+
+                    // serialize here because in sessionless requests the serialization must
occur in the same
+                    // thread as the eval.  as eval occurs in the GremlinExecutor there's
no way to get back to the
+                    // thread that processed the eval of the script so, we have to push serialization
down into that
+                    Frame frame = null;
+                    try {
+                        frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code,
generateMetaData(ctx, msg, code, itty));
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it
does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+
+                        // exception is handled in makeFrame() - serialization error gets
written back to driver
+                        // at that point
+                        onError(graph, context);
+                        break;
+                    }
+
+                    try {
+                        // only need to reset the aggregation list if there's more stuff
to write
+                        if (itty.hasNext())
+                            aggregate = new ArrayList<>(resultIterationBatchSize);
+                        else {
+                            // iteration and serialization are both complete which means
this finished successfully. note that
+                            // errors internal to script eval or timeout will rollback given
GremlinServer's global configurations.
+                            // local errors will get rolledback below because the exceptions
aren't thrown in those cases to be
+                            // caught by the GremlinExecutor for global rollback logic. this
only needs to be committed if
+                            // there are no more items to iterate and serialization is complete
+                            onTraversalSuccess(graph, context);
+
+                            // exit the result iteration loop as there are no more results
left.  using this external control
+                            // because of the above commit.  some graphs may open a new transaction
on the call to
+                            // hasNext()
+                            hasMore = false;
+                        }
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it
does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+                        throw ex;
+                    }
+
+                    iterateComplete(ctx, msg, itty);
+
+                    // the flush is called after the commit has potentially occurred.  in
this way, if a commit was
+                    // required then it will be 100% complete before the client receives
it. the "frame" at this point
+                    // should have completely detached objects from the transaction (i.e.
serialization has occurred)
+                    // so a new one should not be opened on the flush down the netty pipeline
+                    ctx.writeAndFlush(frame);
+                }
+            } else {
+                // don't keep triggering this warning over and over again for the same request
+                if (!warnOnce) {
+                    logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded
on {} - writing will continue once client has caught up", msg);
+                    warnOnce = true;
+                }
+
+                // since the client is lagging we can hold here for a period of time for
the client to catch up.
+                // this isn't blocking the IO thread - just a worker.
+                TimeUnit.MILLISECONDS.sleep(10);
+            }
+
+            stopWatch.split();
+            if (settings.serializedResponseTimeout > 0 && stopWatch.getSplitTime()
> settings.serializedResponseTimeout) {
+                final String timeoutMsg = String.format("Serialization of the entire response
exceeded the 'serializeResponseTimeout' setting %s",
+                        warnOnce ? "[Gremlin Server paused writes to client as messages were
not being consumed quickly enough]" : "");
+                throw new TimeoutException(timeoutMsg.trim());
+            }
+
+            stopWatch.unsplit();
+        }
+
+        stopWatch.stop();
+    }
 }


Mime
View raw message