tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [tinkerpop] branch tp4 updated: Made is so that Beam can return traversers while its pipeline is running.
Date Wed, 27 Mar 2019 00:51:08 GMT
This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 4fb4b90  Made is so that Beam can return traversers while its pipeline is running.
4fb4b90 is described below

commit 4fb4b90c910715ae1e4825af8bec1ff5606e5c0a
Author: Marko A. Rodriguez <okrammarko@gmail.com>
AuthorDate: Tue Mar 26 18:50:51 2019 -0600

    Made is so that Beam can return traversers while its pipeline is running.
---
 .../apache/tinkerpop/machine/processor/beam/Beam.java  | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
index fd0bf65..8cb260a 100644
--- a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
+++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.machine.processor.beam;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -44,8 +45,10 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
     private boolean createTraverserServer;
     private final int traverserServerPort;
     private final Pipeline pipeline;
+    private PipelineResult pipelineResult;
     private Iterator<Traverser<C, E>> iterator = null;
 
+
     public Beam(final Compilation<C, S, E> compilation, final String traverserServerLocation,
final int traverserServerPort, final boolean createTraverserServer) {
         this.traverserServerPort = traverserServerPort;
         this.createTraverserServer = createTraverserServer;
@@ -89,12 +92,15 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
 
     private void setupPipeline() {
         if (null == this.iterator) {
-            this.iterator = this.createTraverserServer ?
-                    new TraverserServer<>(this.traverserServerPort) :
-                    Collections.emptyIterator();
-            this.pipeline.run().waitUntilFinish();
-            if (this.iterator instanceof TraverserServer)
-                ((TraverserServer<C, E>) this.iterator).close();
+            if (this.createTraverserServer) {
+                this.iterator = new TraverserServer<>(this.traverserServerPort);
+                this.pipelineResult = this.pipeline.run();
+            } else {
+                this.iterator = Collections.emptyIterator();
+                this.pipeline.run().waitUntilFinish();
+            }
         }
+        if (this.createTraverserServer && this.pipelineResult.getState().isTerminal())
+            ((TraverserServer) this.iterator).close();
     }
 }


Mime
View raw message