beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [3/3] beam git commit: Revert "Roll-forward Include Additional PTransform inputs in Transform Nodes"
Date Fri, 26 May 2017 20:19:07 GMT
Revert "Roll-forward Include Additional PTransform inputs in Transform Nodes"

This reverts commit 2e2ae9cfa581a73864695d15102acadc2750a57a.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/247f9bc1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/247f9bc1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/247f9bc1

Branch: refs/heads/master
Commit: 247f9bc1581984d026764b3d433cb594e700bc21
Parents: c687887
Author: Thomas Groh <tgroh@google.com>
Authored: Fri May 26 11:00:40 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Fri May 26 13:18:55 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/TranslationContext.java    |   4 +-
 .../core/construction/TransformInputs.java      |  50 ------
 .../core/construction/TransformInputsTest.java  | 166 -------------------
 .../beam/runners/direct/DirectGraphVisitor.java |  15 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |   9 +-
 ...littableProcessElementsEvaluatorFactory.java |   2 -
 .../direct/StatefulParDoEvaluatorFactory.java   |   1 -
 .../beam/runners/direct/WatermarkManager.java   |  17 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   6 +-
 .../flink/FlinkBatchTranslationContext.java     |   3 +-
 .../flink/FlinkStreamingTranslationContext.java |   3 +-
 .../dataflow/DataflowPipelineTranslator.java    |   5 +-
 .../spark/translation/EvaluationContext.java    |   4 +-
 .../beam/sdk/runners/TransformHierarchy.java    |  28 +---
 14 files changed, 29 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index 94d13e1..aff3863 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -34,7 +34,6 @@ import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -94,8 +93,7 @@ class TranslationContext {
   }
 
   public <InputT extends PValue> InputT getInput() {
-    return (InputT)
-        Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
+    return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs().values());
   }
 
   public Map<TupleTag<?>, PValue> getOutputs() {

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java
deleted file mode 100644
index 2baf93a..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.ImmutableList;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/** Utilities for extracting subsets of inputs from an {@link AppliedPTransform}. */
-public class TransformInputs {
-  /**
-   * Gets all inputs of the {@link AppliedPTransform} that are not returned by {@link
-   * PTransform#getAdditionalInputs()}.
-   */
-  public static Collection<PValue> nonAdditionalInputs(AppliedPTransform<?, ?, ?>
application) {
-    ImmutableList.Builder<PValue> mainInputs = ImmutableList.builder();
-    PTransform<?, ?> transform = application.getTransform();
-    for (Map.Entry<TupleTag<?>, PValue> input : application.getInputs().entrySet())
{
-      if (!transform.getAdditionalInputs().containsKey(input.getKey())) {
-        mainInputs.add(input.getValue());
-      }
-    }
-    checkArgument(
-        !mainInputs.build().isEmpty() || application.getInputs().isEmpty(),
-        "Expected at least one main input if any inputs exist");
-    return mainInputs.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java
deleted file mode 100644
index f5b2c11..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static org.junit.Assert.assertThat;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link TransformInputs}. */
-@RunWith(JUnit4.class)
-public class TransformInputsTest {
-  @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void nonAdditionalInputsWithNoInputSucceeds() {
-    AppliedPTransform<PInput, POutput, TestTransform> transform =
-        AppliedPTransform.of(
-            "input-free",
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            new TestTransform(),
-            pipeline);
-
-    assertThat(TransformInputs.nonAdditionalInputs(transform), Matchers.<PValue>empty());
-  }
-
-  @Test
-  public void nonAdditionalInputsWithOneMainInputSucceeds() {
-    PCollection<Long> input = pipeline.apply(GenerateSequence.from(1L));
-    AppliedPTransform<PInput, POutput, TestTransform> transform =
-        AppliedPTransform.of(
-            "input-single",
-            Collections.<TupleTag<?>, PValue>singletonMap(new TupleTag<Long>()
{}, input),
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            new TestTransform(),
-            pipeline);
-
-    assertThat(
-        TransformInputs.nonAdditionalInputs(transform), Matchers.<PValue>containsInAnyOrder(input));
-  }
-
-  @Test
-  public void nonAdditionalInputsWithMultipleNonAdditionalInputsSucceeds() {
-    Map<TupleTag<?>, PValue> allInputs = new HashMap<>();
-    PCollection<Integer> mainInts = pipeline.apply("MainInput", Create.of(12, 3));
-    allInputs.put(new TupleTag<Integer>() {}, mainInts);
-    PCollection<Void> voids = pipeline.apply("VoidInput", Create.empty(VoidCoder.of()));
-    allInputs.put(new TupleTag<Void>() {}, voids);
-    AppliedPTransform<PInput, POutput, TestTransform> transform =
-        AppliedPTransform.of(
-            "additional-free",
-            allInputs,
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            new TestTransform(),
-            pipeline);
-
-    assertThat(
-        TransformInputs.nonAdditionalInputs(transform),
-        Matchers.<PValue>containsInAnyOrder(voids, mainInts));
-  }
-
-  @Test
-  public void nonAdditionalInputsWithAdditionalInputsSucceeds() {
-    Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
-    additionalInputs.put(new TupleTag<String>() {}, pipeline.apply(Create.of("1, 2",
"3")));
-    additionalInputs.put(new TupleTag<Long>() {}, pipeline.apply(GenerateSequence.from(3L)));
-
-    Map<TupleTag<?>, PValue> allInputs = new HashMap<>();
-    PCollection<Integer> mainInts = pipeline.apply("MainInput", Create.of(12, 3));
-    allInputs.put(new TupleTag<Integer>() {}, mainInts);
-    PCollection<Void> voids = pipeline.apply("VoidInput", Create.empty(VoidCoder.of()));
-    allInputs.put(
-        new TupleTag<Void>() {}, voids);
-    allInputs.putAll(additionalInputs);
-
-    AppliedPTransform<PInput, POutput, TestTransform> transform =
-        AppliedPTransform.of(
-            "additional",
-            allInputs,
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            new TestTransform(additionalInputs),
-            pipeline);
-
-    assertThat(
-        TransformInputs.nonAdditionalInputs(transform),
-        Matchers.<PValue>containsInAnyOrder(mainInts, voids));
-  }
-
-  @Test
-  public void nonAdditionalInputsWithOnlyAdditionalInputsThrows() {
-    Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
-    additionalInputs.put(new TupleTag<String>() {}, pipeline.apply(Create.of("1, 2",
"3")));
-    additionalInputs.put(new TupleTag<Long>() {}, pipeline.apply(GenerateSequence.from(3L)));
-
-    AppliedPTransform<PInput, POutput, TestTransform> transform =
-        AppliedPTransform.of(
-            "additional-only",
-            additionalInputs,
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            new TestTransform(additionalInputs),
-            pipeline);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("at least one");
-    TransformInputs.nonAdditionalInputs(transform);
-  }
-
-  private static class TestTransform extends PTransform<PInput, POutput> {
-    private final Map<TupleTag<?>, PValue> additionalInputs;
-
-    private TestTransform() {
-      this(Collections.<TupleTag<?>, PValue>emptyMap());
-    }
-
-    private TestTransform(Map<TupleTag<?>, PValue> additionalInputs) {
-      this.additionalInputs = additionalInputs;
-    }
-
-    @Override
-    public POutput expand(PInput input) {
-      return PDone.in(input.getPipeline());
-    }
-
-    @Override
-    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-      return additionalInputs;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index ed4282b..01204e3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -21,12 +21,10 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -36,8 +34,6 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue}
in the
@@ -45,7 +41,6 @@ import org.slf4j.LoggerFactory;
  * input after the upstream transform has produced and committed output.
  */
 class DirectGraphVisitor extends PipelineVisitor.Defaults {
-  private static final Logger LOG = LoggerFactory.getLogger(DirectGraphVisitor.class);
 
   private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
 
@@ -88,15 +83,7 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
     if (node.getInputs().isEmpty()) {
       rootTransforms.add(appliedTransform);
     } else {
-      Collection<PValue> mainInputs =
-          TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline()));
-      if (!mainInputs.containsAll(node.getInputs().values())) {
-        LOG.debug(
-            "Inputs reduced to {} from {} by removing additional inputs",
-            mainInputs,
-            node.getInputs().values());
-      }
-      for (PValue value : mainInputs) {
+      for (PValue value : node.getInputs().values()) {
         primitiveConsumers.put(value, appliedTransform);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 516f798..8aa75cf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterables;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -78,7 +79,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
         (TransformEvaluator<T>)
             createEvaluator(
                 (AppliedPTransform) application,
-                (PCollection<InputT>) inputBundle.getPCollection(),
                 inputBundle.getKey(),
                 doFn,
                 transform.getSideInputs(),
@@ -102,7 +102,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
   @SuppressWarnings({"unchecked", "rawtypes"})
   DoFnLifecycleManagerRemovingTransformEvaluator<InputT> createEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
-      PCollection<InputT> mainInput,
       StructuralKey<?> inputBundleKey,
       DoFn<InputT, OutputT> doFn,
       List<PCollectionView<?>> sideInputs,
@@ -121,7 +120,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
         createParDoEvaluator(
             application,
             inputBundleKey,
-            mainInput,
             sideInputs,
             mainOutputTag,
             additionalOutputTags,
@@ -134,7 +132,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
   ParDoEvaluator<InputT> createParDoEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
       StructuralKey<?> key,
-      PCollection<InputT> mainInput,
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
@@ -147,7 +144,8 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
           evaluationContext,
           stepContext,
           application,
-          mainInput.getWindowingStrategy(),
+          ((PCollection<InputT>) Iterables.getOnlyElement(application.getInputs().values()))
+              .getWindowingStrategy(),
           fn,
           key,
           sideInputs,
@@ -175,4 +173,5 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     }
     return pcs;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index eccc83a..b85f481c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -116,8 +116,6 @@ class SplittableProcessElementsEvaluatorFactory<
             delegateFactory.createParDoEvaluator(
                 application,
                 inputBundle.getKey(),
-                (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT,
RestrictionT>>>)
-                    inputBundle.getPCollection(),
                 transform.getSideInputs(),
                 transform.getMainOutputTag(),
                 transform.getAdditionalOutputTags().getAll(),

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 3619d05..506c84c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -117,7 +117,6 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements
Transfo
     DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator
=
         delegateFactory.createEvaluator(
             (AppliedPTransform) application,
-            (PCollection) inputBundle.getPCollection(),
             inputBundle.getKey(),
             doFn,
             application.getTransform().getUnderlyingParDo().getSideInputs(),

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index b15b52e..4f1b831 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -823,11 +823,10 @@ class WatermarkManager {
       inputWmsBuilder.add(THE_END_OF_TIME);
     }
     for (PValue pvalue : inputs.values()) {
-      if (graph.getPrimitiveConsumers(pvalue).contains(transform)) {
-        Watermark producerOutputWatermark =
-            getTransformWatermark(graph.getProducer(pvalue)).synchronizedProcessingOutputWatermark;
-        inputWmsBuilder.add(producerOutputWatermark);
-      }
+      Watermark producerOutputWatermark =
+          getTransformWatermark(graph.getProducer(pvalue))
+              .synchronizedProcessingOutputWatermark;
+      inputWmsBuilder.add(producerOutputWatermark);
     }
     return inputWmsBuilder.build();
   }
@@ -839,11 +838,9 @@ class WatermarkManager {
       inputWatermarksBuilder.add(THE_END_OF_TIME);
     }
     for (PValue pvalue : inputs.values()) {
-      if (graph.getPrimitiveConsumers(pvalue).contains(transform)) {
-        Watermark producerOutputWatermark =
-            getTransformWatermark(graph.getProducer(pvalue)).outputWatermark;
-        inputWatermarksBuilder.add(producerOutputWatermark);
-      }
+      Watermark producerOutputWatermark =
+          getTransformWatermark(graph.getProducer(pvalue)).outputWatermark;
+      inputWatermarksBuilder.add(producerOutputWatermark);
     }
     List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
     return inputCollectionWatermarks;

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 8b86bbe..09a21ac 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -98,7 +98,7 @@ public class ParDoEvaluatorTest {
     when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
 
     ParDoEvaluator<Integer> evaluator =
-        createEvaluator(singletonView, fn, inputPc, output);
+        createEvaluator(singletonView, fn, output);
 
     IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L));
     WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(3);
@@ -132,7 +132,6 @@ public class ParDoEvaluatorTest {
   private ParDoEvaluator<Integer> createEvaluator(
       PCollectionView<Integer> singletonView,
       RecorderFn fn,
-      PCollection<Integer> input,
       PCollection<Integer> output) {
     when(
             evaluationContext.createSideInputReader(
@@ -157,7 +156,8 @@ public class ParDoEvaluatorTest {
         evaluationContext,
         stepContext,
         transform,
-        input.getWindowingStrategy(),
+        ((PCollection<?>) Iterables.getOnlyElement(transform.getInputs().values()))
+            .getWindowingStrategy(),
         fn,
         null /* key */,
         ImmutableList.<PCollectionView<?>>of(singletonView),

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
index 6e70198..0439119 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.flink;
 import com.google.common.collect.Iterables;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -144,7 +143,7 @@ class FlinkBatchTranslationContext {
 
   @SuppressWarnings("unchecked")
   <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
+    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
   }
 
   Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
index 74a5fb9..ea5f6b3 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.Iterables;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -114,7 +113,7 @@ class FlinkStreamingTranslationContext {
 
   @SuppressWarnings("unchecked")
   public <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
+    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
   }
 
   public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T,
?> transform) {

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index fccd018..af93ef5 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,7 +56,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
 import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues;
@@ -396,9 +395,7 @@ public class DataflowPipelineTranslator {
 
     @Override
     public <InputT extends PValue> InputT getInput(PTransform<InputT, ?> transform)
{
-      return (InputT)
-          Iterables.getOnlyElement(
-              TransformInputs.nonAdditionalInputs(getCurrentTransform(transform)));
+      return (InputT) Iterables.getOnlyElement(getInputs(transform).values());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 0c6c4d1..8102926 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -26,7 +26,6 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.sdk.Pipeline;
@@ -104,8 +103,7 @@ public class EvaluationContext {
 
   public <T extends PValue> T getInput(PTransform<T, ?> transform) {
     @SuppressWarnings("unchecked")
-    T input =
-        (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
+    T input = (T) Iterables.getOnlyElement(getInputs(transform).values());
     return input;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index bebc306..630d24c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
@@ -67,7 +68,7 @@ public class TransformHierarchy {
     producers = new HashMap<>();
     producerInput = new HashMap<>();
     unexpandedInputs = new HashMap<>();
-    root = new Node();
+    root = new Node(null, null, "", null);
     current = root;
   }
 
@@ -254,36 +255,25 @@ public class TransformHierarchy {
     boolean finishedSpecifying = false;
 
     /**
-     * Creates the root-level node. The root level node has a null enclosing node, a null
transform,
-     * an empty map of inputs, and a name equal to the empty string.
-     */
-    private Node() {
-      this.enclosingNode = null;
-      this.transform = null;
-      this.fullName = "";
-      this.inputs = Collections.emptyMap();
-    }
-
-    /**
      * Creates a new Node with the given parent and transform.
      *
+     * <p>EnclosingNode and transform may both be null for a root-level node, which
holds all other
+     * nodes.
+     *
      * @param enclosingNode the composite node containing this node
      * @param transform the PTransform tracked by this node
      * @param fullName the fully qualified name of the transform
      * @param input the unexpanded input to the transform
      */
     private Node(
-        Node enclosingNode,
-        PTransform<?, ?> transform,
+        @Nullable Node enclosingNode,
+        @Nullable PTransform<?, ?> transform,
         String fullName,
-        PInput input) {
+        @Nullable PInput input) {
       this.enclosingNode = enclosingNode;
       this.transform = transform;
       this.fullName = fullName;
-      ImmutableMap.Builder<TupleTag<?>, PValue> inputs = ImmutableMap.builder();
-      inputs.putAll(input.expand());
-      inputs.putAll(transform.getAdditionalInputs());
-      this.inputs = inputs.build();
+      this.inputs = input == null ? Collections.<TupleTag<?>, PValue>emptyMap()
: input.expand();
     }
 
     /**


Mime
View raw message