beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/3] beam git commit: Revert "Visit a Transform Hierarchy in Topological Order"
Date Fri, 26 May 2017 20:19:06 GMT
Revert "Visit a Transform Hierarchy in Topological Order"

This reverts commit bd1dfdf3c8e145a99bcacebd0c64dcf6580f3ffe.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ad6433e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ad6433e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ad6433e

Branch: refs/heads/master
Commit: 6ad6433ec0c02aec8656e9e3b27f6e0f974f8ece
Parents: 247f9bc
Author: Thomas Groh <tgroh@google.com>
Authored: Fri May 26 11:04:05 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Fri May 26 13:18:55 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/spark/SparkRunner.java  | 13 ---
 .../spark/translation/BoundedDataset.java       |  6 --
 .../spark/translation/TransformTranslator.java  |  1 -
 .../spark/translation/StorageLevelTest.java     |  4 +-
 .../beam/sdk/runners/TransformHierarchy.java    | 46 +----------
 .../sdk/runners/TransformHierarchyTest.java     | 86 --------------------
 6 files changed, 6 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 1be5e13..9e2426e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -21,10 +21,8 @@ package org.apache.beam.runners.spark;
 import com.google.common.collect.Iterables;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -51,7 +49,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -339,8 +336,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
     protected final EvaluationContext ctxt;
     protected final SparkPipelineTranslator translator;
 
-    private final Set<Node> shouldIgnoreChildren = new HashSet<>();
-
     public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) {
       this.translator = translator;
       this.ctxt = ctxt;
@@ -356,7 +351,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
           LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
           LOG.debug("Composite transform class: '{}'", transformClass);
           doVisitTransform(node);
-          shouldIgnoreChildren.add(node);
           return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
         }
       }
@@ -398,13 +392,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
 
     @Override
     public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-      Node parent = node.getEnclosingNode();
-      while (!parent.isRootNode()) {
-        if (shouldIgnoreChildren.contains(parent)) {
-          return;
-        }
-        parent = parent.getEnclosingNode();
-      }
       doVisitTransform(node);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index a746634..652c753 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -19,7 +19,6 @@
 package org.apache.beam.runners.spark.translation;
 
 import com.google.common.base.Function;
-import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import javax.annotation.Nullable;
@@ -98,13 +97,8 @@ public class BoundedDataset<T> implements Dataset {
     return windowedValues;
   }
 
-  int timesCached = 0;
   @Override
   public void cache(String storageLevel) {
-    System.out.printf(
-        "Persisting Dataset %s for RDD %s (id %s) at level %s. %s times before%n",
-        this, getRDD(), getRDD().toDebugString(), storageLevel, timesCached++);
-    System.out.println(Joiner.on("\n\t").join(new Throwable().getStackTrace()));
     // populate the rdd if needed
     getRDD().persist(StorageLevel.fromString(storageLevel));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 6ca12c9..742ea83 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -407,7 +407,6 @@ public final class TransformTranslator {
         JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>(
             jsc.sc(), transform.getSource(), runtimeContext, stepName).toJavaRDD();
         // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation.
-        System.out.println("Evaluating Bounded Read " + transform);
         context.putDataset(transform, new BoundedDataset<>(input.cache()));
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
index 8bd6dae..8f2e681 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -58,12 +58,12 @@ public class StorageLevelTest {
 
   @Test
   public void test() throws Exception {
-    PCollection<String> pCollection = pipeline.apply("CreateFoo", Create.of("foo"));
+    PCollection<String> pCollection = pipeline.apply(Create.of("foo"));
 
     // by default, the Spark runner doesn't cache the RDD if it accessed only one time.
     // So, to "force" the caching of the RDD, we have to call the RDD at least two time.
     // That's why we are using Count fn on the PCollection.
-    pCollection.apply("CountAll", Count.<String>globally());
+    pCollection.apply(Count.<String>globally());
 
     PCollection<String> output = pCollection.apply(new StorageLevelPTransform());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 630d24c..2f0e8ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -202,12 +202,10 @@ public class TransformHierarchy {
     return producers.get(produced);
   }
 
-  int traversed = 0;
   public Set<PValue> visit(PipelineVisitor visitor) {
     finishSpecifying();
     Set<PValue> visitedValues = new HashSet<>();
-    traversed++;
-    root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>());
+    root.visit(visitor, visitedValues);
     return visitedValues;
   }
 
@@ -464,22 +462,7 @@ public class TransformHierarchy {
      * <p>Provides an ordered visit of the input values, the primitive transform (or
child nodes for
      * composite transforms), then the output values.
      */
-    private void visit(
-        PipelineVisitor visitor,
-        Set<PValue> visitedValues,
-        Set<Node> visitedNodes,
-        Set<Node> passedComposites) {
-      if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode()))
{
-        getEnclosingNode().visit(visitor, visitedValues, visitedNodes, passedComposites);
-      }
-      if (!visitedNodes.add(this)) {
-        LOG.debug("Not revisiting previously visited node {}", this);
-        return;
-      } else if (childNodeOf(passedComposites)) {
-        LOG.debug("Not revisiting Node {} which is a child of a previously passed composite",
this);
-        return;
-      }
-
+    private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) {
       if (!finishedSpecifying) {
         finishSpecifying();
       }
@@ -487,31 +470,22 @@ public class TransformHierarchy {
       if (!isRootNode()) {
         // Visit inputs.
         for (PValue inputValue : inputs.values()) {
-          Node valueProducer = getProducer(inputValue);
-          if (!visitedNodes.contains(valueProducer)) {
-            valueProducer.visit(visitor, visitedValues, visitedNodes, passedComposites);
-          }
           if (visitedValues.add(inputValue)) {
-            LOG.debug("Visiting input value {}", inputValue);
-            visitor.visitValue(inputValue, valueProducer);
+            visitor.visitValue(inputValue, getProducer(inputValue));
           }
         }
       }
 
       if (isCompositeNode()) {
-        LOG.debug("Visiting composite node {}", this);
         PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this);
 
         if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) {
           for (Node child : parts) {
-            child.visit(visitor, visitedValues, visitedNodes, passedComposites);
+            child.visit(visitor, visitedValues);
           }
-        } else {
-          passedComposites.add(this);
         }
         visitor.leaveCompositeTransform(this);
       } else {
-        LOG.debug("Visiting primitive node {}", this);
         visitor.visitPrimitiveTransform(this);
       }
 
@@ -520,24 +494,12 @@ public class TransformHierarchy {
         // Visit outputs.
         for (PValue pValue : outputs.values()) {
           if (visitedValues.add(pValue)) {
-            LOG.debug("Visiting output value {}", pValue);
             visitor.visitValue(pValue, this);
           }
         }
       }
     }
 
-    private boolean childNodeOf(Set<Node> nodes) {
-      if (isRootNode()) {
-        return false;
-      }
-      Node parent = this.getEnclosingNode();
-      while (!parent.isRootNode() && !nodes.contains(parent)) {
-        parent = parent.getEnclosingNode();
-      }
-      return nodes.contains(parent);
-    }
-
     /**
      * Finish specifying a transform.
      *

http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 2fe2817..1197d1b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.runners;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
@@ -33,8 +32,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
@@ -495,87 +492,4 @@ public class TransformHierarchyTest implements Serializable {
     assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode));
     assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output));
   }
-
-  @Test
-  public void visitIsTopologicallyOrdered() {
-    PCollection<String> one =
-        PCollection.<String>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
-            .setCoder(StringUtf8Coder.of());
-    final PCollection<Integer> two =
-        PCollection.<Integer>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-            .setCoder(VarIntCoder.of());
-    final PDone done = PDone.in(pipeline);
-    final TupleTag<String> oneTag = new TupleTag<String>() {};
-    final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
-    final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two);
-
-    hierarchy.pushNode("consumes_both", one, new PTransform<PCollection<String>,
PDone>() {
-      @Override
-      public PDone expand(PCollection<String> input) {
-        return done;
-      }
-
-      @Override
-      public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-        return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two);
-      }
-    });
-    hierarchy.setOutput(done);
-    hierarchy.popNode();
-
-    final PTransform<PBegin, PCollectionTuple> producer =
-        new PTransform<PBegin, PCollectionTuple>() {
-          @Override
-          public PCollectionTuple expand(PBegin input) {
-            return oneAndTwo;
-          }
-        };
-    hierarchy.pushNode(
-        "encloses_producer",
-        PBegin.in(pipeline),
-        new PTransform<PBegin, PCollectionTuple>() {
-          @Override
-          public PCollectionTuple expand(PBegin input) {
-            return input.apply(producer);
-          }
-        });
-    hierarchy.pushNode(
-        "creates_one_and_two",
-        PBegin.in(pipeline), producer);
-    hierarchy.setOutput(oneAndTwo);
-    hierarchy.popNode();
-    hierarchy.setOutput(oneAndTwo);
-    hierarchy.popNode();
-
-    hierarchy.visit(new PipelineVisitor.Defaults() {
-      private final Set<Node> visitedNodes = new HashSet<>();
-      private final Set<PValue> visitedValues = new HashSet<>();
-      @Override
-      public CompositeBehavior enterCompositeTransform(Node node) {
-        for (PValue input : node.getInputs().values()) {
-          assertThat(visitedValues, hasItem(input));
-        }
-        visitedNodes.add(node);
-        return CompositeBehavior.ENTER_TRANSFORM;
-      }
-
-      @Override
-      public void visitPrimitiveTransform(Node node) {
-        assertThat(visitedNodes, hasItem(node.getEnclosingNode()));
-        for (PValue input : node.getInputs().values()) {
-          assertThat(visitedValues, hasItem(input));
-        }
-        visitedNodes.add(node);
-      }
-
-      @Override
-      public void visitValue(PValue value, Node producer) {
-        assertThat(visitedNodes, hasItem(producer));
-        assertThat(visitedValues, not(hasItem(value)));
-        visitedValues.add(value);
-      }
-    });
-  }
 }


Mime
View raw message