beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: Remove Pipeline reference from TransformHierarchy
Date Mon, 22 May 2017 22:16:54 GMT
Repository: beam
Updated Branches:
  refs/heads/master 983a44926 -> b633abe2c


Remove Pipeline reference from TransformHierarchy

This change removes a direct dependency cycle between Pipeline and
TransformHierarchy. There is still an indirect cycle through PValues, but that
is slightly less problematic.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e9fcebc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e9fcebc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e9fcebc

Branch: refs/heads/master
Commit: 5e9fcebc07725de368391914781e5b4d5f9c4a19
Parents: d7a4e49
Author: Kenneth Knowles <klk@google.com>
Authored: Fri May 19 12:57:41 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Sat May 20 08:33:21 2017 -0700

----------------------------------------------------------------------
 .../translation/ApexPipelineTranslator.java     |  4 +-
 .../apex/translation/TranslationContext.java    |  5 +-
 .../core/construction/SdkComponents.java        | 14 +++---
 .../core/construction/SdkComponentsTest.java    |  7 +--
 .../beam/runners/direct/DirectGraphVisitor.java |  3 +-
 .../direct/KeyedPValueTrackingVisitor.java      |  5 +-
 .../flink/FlinkBatchPipelineTranslator.java     |  2 +-
 .../apache/beam/runners/flink/FlinkRunner.java  | 10 +---
 .../flink/FlinkStreamingPipelineTranslator.java |  4 +-
 .../dataflow/DataflowPipelineTranslator.java    |  6 +--
 .../beam/runners/dataflow/DataflowRunner.java   |  2 +-
 .../apache/beam/runners/spark/SparkRunner.java  |  2 +-
 .../streaming/TrackStreamingSourcesTest.java    | 14 +++++-
 .../main/java/org/apache/beam/sdk/Pipeline.java | 50 ++++++++++++++++----
 .../beam/sdk/runners/TransformHierarchy.java    |  6 +--
 .../sdk/runners/TransformHierarchyTest.java     |  2 +-
 16 files changed, 82 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 32e470f..bda074b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
  * into Apex logical plan {@link DAG}.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
-public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
+public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
   private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class);
 
   /**
@@ -110,7 +110,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor
{
       throw new UnsupportedOperationException(
           "no translator registered for " + transform);
     }
-    translationContext.setCurrentTransform(node);
+    translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
     translator.translate(transform, translationContext);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index a5e3028..aff3863 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -36,7 +36,6 @@ import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -77,8 +76,8 @@ class TranslationContext {
     this.pipelineOptions = pipelineOptions;
   }
 
-  public void setCurrentTransform(TransformHierarchy.Node treeNode) {
-    this.currentTransform = treeNode.toAppliedPTransform();
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+    this.currentTransform = transform;
   }
 
   public ApexPipelineOptions getPipelineOptions() {

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index eb29b9a..5714fc5 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -62,10 +62,10 @@ class SdkComponents {
     return new SdkComponents();
   }
 
-  public static RunnerApi.Pipeline translatePipeline(Pipeline p) {
+  public static RunnerApi.Pipeline translatePipeline(Pipeline pipeline) {
     final SdkComponents components = create();
     final Collection<String> rootIds = new HashSet<>();
-    p.traverseTopologically(
+    pipeline.traverseTopologically(
         new PipelineVisitor.Defaults() {
           private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children
=
               ArrayListMultimap.create();
@@ -77,9 +77,10 @@ class SdkComponents {
                 rootIds.add(components.getExistingPTransformId(pipelineRoot));
               }
             } else {
-              children.put(node.getEnclosingNode(), node.toAppliedPTransform());
+              children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline()));
               try {
-                components.registerPTransform(node.toAppliedPTransform(), children.get(node));
+                components.registerPTransform(
+                    node.toAppliedPTransform(getPipeline()), children.get(node));
               } catch (IOException e) {
                 throw new RuntimeException(e);
               }
@@ -88,10 +89,11 @@ class SdkComponents {
 
           @Override
           public void visitPrimitiveTransform(Node node) {
-            children.put(node.getEnclosingNode(), node.toAppliedPTransform());
+            children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline()));
             try {
               components.registerPTransform(
-                  node.toAppliedPTransform(), Collections.<AppliedPTransform<?, ?,
?>>emptyList());
+                  node.toAppliedPTransform(getPipeline()),
+                  Collections.<AppliedPTransform<?, ?, ?>>emptyList());
             } catch (IOException e) {
               throw new IllegalStateException(e);
             }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index 7424886..55702ea 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -97,18 +97,13 @@ public class SdkComponentsTest {
 
     final RunnerApi.Pipeline pipelineProto = SdkComponents.translatePipeline(pipeline);
     pipeline.traverseTopologically(
-        new PipelineVisitor() {
+        new PipelineVisitor.Defaults() {
           Set<Node> transforms = new HashSet<>();
           Set<PCollection<?>> pcollections = new HashSet<>();
           Set<Equivalence.Wrapper<? extends Coder<?>>> coders = new HashSet<>();
           Set<WindowingStrategy<?, ?>> windowingStrategies = new HashSet<>();
 
           @Override
-          public CompositeBehavior enterCompositeTransform(Node node) {
-            return CompositeBehavior.ENTER_TRANSFORM;
-          }
-
-          @Override
           public void leaveCompositeTransform(Node node) {
             if (node.isRootNode()) {
               assertThat(

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 1ee8ceb..01204e3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.PValue;
  * input after the upstream transform has produced and committed output.
  */
 class DirectGraphVisitor extends PipelineVisitor.Defaults {
+
   private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
 
   private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers
=
@@ -101,7 +102,7 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
   private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node)
{
     @SuppressWarnings({"rawtypes", "unchecked"})
-    AppliedPTransform<?, ?, ?> application = node.toAppliedPTransform();
+    AppliedPTransform<?, ?, ?> application = node.toAppliedPTransform(getPipeline());
     return application;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 347f313..f9b2dae 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -44,7 +44,7 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 // TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms
 // unkeyed
-class KeyedPValueTrackingVisitor implements PipelineVisitor {
+class KeyedPValueTrackingVisitor extends PipelineVisitor.Defaults {
 
   private static final Set<Class<? extends PTransform>> PRODUCES_KEYED_OUTPUTS
=
       ImmutableSet.of(
@@ -91,9 +91,6 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
   }
 
   @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {}
-
-  @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {
     boolean inputsAreKeyed = true;
     for (PValue input : producer.getInputs().values()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
index 854b674..50910b5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
@@ -112,7 +112,7 @@ class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
     BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>)
translator;
 
     // create the applied PTransform on the batchContext
-    batchContext.setCurrentTransform(node.toAppliedPTransform());
+    batchContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
     typedTranslator.translateNode(typedTransform, batchContext);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 80ef7bb..ca12615 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.program.DetachedEnvironment;
 import org.slf4j.Logger;
@@ -199,10 +198,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult>
{
     // have just recorded the full names during apply time.
     if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
       final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new
TreeSet<>();
-      pipeline.traverseTopologically(new Pipeline.PipelineVisitor() {
-        @Override
-        public void visitValue(PValue value, TransformHierarchy.Node producer) {
-        }
+      pipeline.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() {
 
         @Override
         public void visitPrimitiveTransform(TransformHierarchy.Node node) {
@@ -218,10 +214,6 @@ public class FlinkRunner extends PipelineRunner<PipelineResult>
{
           }
           return CompositeBehavior.ENTER_TRANSFORM;
         }
-
-        @Override
-        public void leaveCompositeTransform(TransformHierarchy.Node node) {
-        }
       });
 
       LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for
{} "

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 53a1fa1..8da68c5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -188,7 +188,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator
{
     StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>)
translator;
 
     // create the applied PTransform on the streamingContext
-    streamingContext.setCurrentTransform(node.toAppliedPTransform());
+    streamingContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
     typedTranslator.translateNode(typedTransform, streamingContext);
   }
 
@@ -203,7 +203,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator
{
     @SuppressWarnings("unchecked")
     StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>)
translator;
 
-    streamingContext.setCurrentTransform(node.toAppliedPTransform());
+    streamingContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
 
     return typedTranslator.canTranslate(typedTransform, streamingContext);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 840bda8..6d7a0f8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -431,18 +431,18 @@ public class DataflowPipelineTranslator {
           transform,
           node.getFullName());
       LOG.debug("Translating {}", transform);
-      currentTransform = node.toAppliedPTransform();
+      currentTransform = node.toAppliedPTransform(getPipeline());
       translator.translate(transform, this);
       currentTransform = null;
     }
 
     @Override
     public void visitValue(PValue value, TransformHierarchy.Node producer) {
-      producers.put(value, producer.toAppliedPTransform());
+      producers.put(value, producer.toAppliedPTransform(getPipeline()));
       LOG.debug("Checking translation of {}", value);
       if (!producer.isCompositeNode()) {
         // Primitive transforms are the only ones assigned step names.
-        asOutputReference(value, producer.toAppliedPTransform());
+        asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 2ef8737..cce6ce7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -729,7 +729,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
     if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
       final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new
TreeSet<>();
       pipeline.traverseTopologically(
-          new PipelineVisitor() {
+          new PipelineVisitor.Defaults() {
             @Override
             public void visitValue(PValue value, TransformHierarchy.Node producer) {}
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 8c02f0f..9e2426e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -404,7 +404,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
       @SuppressWarnings("unchecked")
       TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass);
       LOG.info("Evaluating {}", transform);
-      AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform();
+      AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(getPipeline());
       ctxt.setCurrentTransform(appliedTransform);
       evaluator.evaluate(transform, ctxt);
       ctxt.setCurrentTransform(null);

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
index 33a636a..e8a5951 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
@@ -148,6 +148,12 @@ public class TrackStreamingSourcesTest {
     }
 
     @Override
+    public void enterPipeline(Pipeline p) {
+      super.enterPipeline(p);
+      evaluator.enterPipeline(p);
+    }
+
+    @Override
     public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
       return evaluator.enterCompositeTransform(node);
     }
@@ -156,7 +162,7 @@ public class TrackStreamingSourcesTest {
     public void visitPrimitiveTransform(TransformHierarchy.Node node) {
       PTransform transform = node.getTransform();
       if (transform.getClass() == transformClassToAssert) {
-        AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform();
+        AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(getPipeline());
         ctxt.setCurrentTransform(appliedTransform);
         //noinspection unchecked
         Dataset dataset = ctxt.borrowDataset((PTransform<? extends PValue, ?>) transform);
@@ -166,6 +172,12 @@ public class TrackStreamingSourcesTest {
         evaluator.visitPrimitiveTransform(node);
       }
     }
+
+    @Override
+    public void leavePipeline(Pipeline p) {
+      super.leavePipeline(p);
+      evaluator.leavePipeline(p);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 83496a5..bdf8a12 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -205,7 +206,7 @@ public class Pipeline {
           public CompositeBehavior enterCompositeTransform(Node node) {
             if (!node.isRootNode()) {
               for (PTransformOverride override : overrides) {
-                if (override.getMatcher().matches(node.toAppliedPTransform())) {
+                if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline())))
{
                   matched.put(node, override);
                 }
               }
@@ -227,7 +228,7 @@ public class Pipeline {
           @Override
           public void visitPrimitiveTransform(Node node) {
             for (PTransformOverride override : overrides) {
-              if (override.getMatcher().matches(node.toAppliedPTransform())) {
+              if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline())))
{
                 matched.put(node, override);
               }
             }
@@ -238,7 +239,7 @@ public class Pipeline {
   private void replace(final PTransformOverride override) {
     final Set<Node> matches = new HashSet<>();
     final Set<Node> freedNodes = new HashSet<>();
-    transforms.visit(
+    traverseTopologically(
         new PipelineVisitor.Defaults() {
           @Override
           public CompositeBehavior enterCompositeTransform(Node node) {
@@ -247,7 +248,8 @@ public class Pipeline {
               freedNodes.add(node);
               return CompositeBehavior.ENTER_TRANSFORM;
             }
-            if (!node.isRootNode() && override.getMatcher().matches(node.toAppliedPTransform()))
{
+            if (!node.isRootNode()
+                && override.getMatcher().matches(node.toAppliedPTransform(getPipeline())))
{
               matches.add(node);
               // This node will be freed. When we visit any of its children, they will also
be freed
               freedNodes.add(node);
@@ -259,7 +261,7 @@ public class Pipeline {
           public void visitPrimitiveTransform(Node node) {
             if (freedNodes.contains(node.getEnclosingNode())) {
               freedNodes.add(node);
-            } else if (override.getMatcher().matches(node.toAppliedPTransform())) {
+            } else if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline())))
{
               matches.add(node);
               freedNodes.add(node);
             }
@@ -334,8 +336,14 @@ public class Pipeline {
   @Internal
   public interface PipelineVisitor {
     /**
-     * Called for each composite transform after all topological predecessors have been visited
-     * but before any of its component transforms.
+     * Called before visiting anything values or transforms, as many uses of a visitor require
+     * access to the {@link Pipeline} object itself.
+     */
+    void enterPipeline(Pipeline p);
+
+    /**
+     * Called for each composite transform after all topological predecessors have been visited
but
+     * before any of its component transforms.
      *
      * <p>The return value controls whether or not child transforms are visited.
      */
@@ -360,6 +368,11 @@ public class Pipeline {
     void visitValue(PValue value, TransformHierarchy.Node producer);
 
     /**
+     * Called when all values and transforms in a {@link Pipeline} have been visited.
+     */
+    void leavePipeline(Pipeline pipeline);
+
+    /**
      * Control enum for indicating whether or not a traversal should process the contents
of
      * a composite transform or not.
      */
@@ -373,6 +386,18 @@ public class Pipeline {
      * User implementations can override just those methods they are interested in.
      */
     class Defaults implements PipelineVisitor {
+
+      private Pipeline pipeline;
+
+      protected Pipeline getPipeline() {
+        return pipeline;
+      }
+
+      @Override
+      public void enterPipeline(Pipeline pipeline) {
+        this.pipeline = checkNotNull(pipeline);
+      }
+
       @Override
       public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
         return CompositeBehavior.ENTER_TRANSFORM;
@@ -386,6 +411,11 @@ public class Pipeline {
 
       @Override
       public void visitValue(PValue value, TransformHierarchy.Node producer) { }
+
+      @Override
+      public void leavePipeline(Pipeline pipeline) {
+        this.pipeline = null;
+      }
     }
   }
 
@@ -406,7 +436,9 @@ public class Pipeline {
    */
   @Internal
   public void traverseTopologically(PipelineVisitor visitor) {
+    visitor.enterPipeline(this);
     transforms.visit(visitor);
+    visitor.leavePipeline(this);
   }
 
   /**
@@ -444,7 +476,7 @@ public class Pipeline {
   /////////////////////////////////////////////////////////////////////////////
   // Below here are internal operations, never called by users.
 
-  private final TransformHierarchy transforms = new TransformHierarchy(this);
+  private final TransformHierarchy transforms = new TransformHierarchy();
   private Set<String> usedFullNames = new HashSet<>();
   private CoderRegistry coderRegistry;
   private final List<String> unstableNames = new ArrayList<>();
@@ -495,7 +527,7 @@ public class Pipeline {
           PTransformOverrideFactory<InputT, OutputT, TransformT> replacementFactory)
{
     PTransformReplacement<InputT, OutputT> replacement =
         replacementFactory.getReplacementTransform(
-            (AppliedPTransform<InputT, OutputT, TransformT>) original.toAppliedPTransform());
+            (AppliedPTransform<InputT, OutputT, TransformT>) original.toAppliedPTransform(this));
     if (replacement.getTransform() == original.getTransform()) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index fac558b..2f0e8ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
 public class TransformHierarchy {
   private static final Logger LOG = LoggerFactory.getLogger(TransformHierarchy.class);
 
-  private final Pipeline pipeline;
   private final Node root;
   private final Map<Node, PInput> unexpandedInputs;
   private final Map<POutput, Node> producers;
@@ -65,8 +64,7 @@ public class TransformHierarchy {
   // Maintain a stack based on the enclosing nodes
   private Node current;
 
-  public TransformHierarchy(Pipeline pipeline) {
-    this.pipeline = pipeline;
+  public TransformHierarchy() {
     producers = new HashMap<>();
     producerInput = new HashMap<>();
     unexpandedInputs = new HashMap<>();
@@ -453,7 +451,7 @@ public class TransformHierarchy {
     /**
      * Returns the {@link AppliedPTransform} representing this {@link Node}.
      */
-    public AppliedPTransform<?, ?, ?> toAppliedPTransform() {
+    public AppliedPTransform<?, ?, ?> toAppliedPTransform(Pipeline pipeline) {
       return AppliedPTransform.of(
           getFullName(), inputs, outputs, (PTransform) getTransform(), pipeline);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 125e159..1197d1b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -79,7 +79,7 @@ public class TransformHierarchyTest implements Serializable {
 
   @Before
   public void setup() {
-    hierarchy = new TransformHierarchy(pipeline);
+    hierarchy = new TransformHierarchy();
   }
 
   @Test


Mime
View raw message