beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: Check for Deferral on Non-additional inputs
Date Tue, 13 Jun 2017 03:21:13 GMT
Repository: beam
Updated Branches:
  refs/heads/master ce3dd4583 -> 32d55323c


Check for Deferral on Non-additional inputs

Because Side Inputs are represented within the expanded inputs, the
check that the transform is a Combine with Side Inputs would never be
hit. This ensures that we do not consider additional inputs during the
check to defer evaluation of the node.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ac18b2e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ac18b2e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ac18b2e

Branch: refs/heads/master
Commit: 1ac18b2eb1371422e60d50a8c3f37b3b24d59611
Parents: ce3dd45
Author: Thomas Groh <tgroh@google.com>
Authored: Mon Jun 12 16:55:59 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon Jun 12 16:55:59 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/spark/SparkRunner.java  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1ac18b2e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 9e2426e..d008718 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.metrics.AggregatorMetricSource;
@@ -359,10 +360,12 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
 
     protected boolean shouldDefer(TransformHierarchy.Node node) {
       // if the input is not a PCollection, or it is but with non merging windows, don't
defer.
-      if (node.getInputs().size() != 1) {
+      Collection<PValue> nonAdditionalInputs =
+          TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline()));
+      if (nonAdditionalInputs.size() != 1) {
         return false;
       }
-      PValue input = Iterables.getOnlyElement(node.getInputs().values());
+      PValue input = Iterables.getOnlyElement(nonAdditionalInputs);
       if (!(input instanceof PCollection)
           || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) {
         return false;


Mime
View raw message