beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] beam git commit: Remove PValue.getProducingTransformInternal
Date Tue, 21 Feb 2017 23:21:56 GMT
Remove PValue.getProducingTransformInternal

This exposes too much information on the structure of the Graph to be
appropriate for non-graph related APIs. It also is not stable, and
subject to change at any point due to Pipeline Surgery APIs.

Update DataflowRunner to use the visited TransformHierarchy to obtain
the producing transforms for PValues, and store these values to
construct the graph.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b6863b1d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b6863b1d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b6863b1d

Branch: refs/heads/master
Commit: b6863b1ddba1fd5ccf5ac61da5aca74116ffa77a
Parents: 154c543
Author: Thomas Groh <tgroh@google.com>
Authored: Tue Feb 21 13:55:23 2017 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Tue Feb 21 15:21:41 2017 -0800

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 97 ++++++++++----------
 .../beam/runners/dataflow/DataflowRunner.java   |  3 +-
 .../runners/dataflow/TransformTranslator.java   | 13 ++-
 .../DataflowPipelineTranslatorTest.java         |  7 +-
 .../java/org/apache/beam/sdk/values/PValue.java |  9 --
 5 files changed, 67 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b6863b1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 55f3e5e..6eec603 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -92,6 +92,7 @@ import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
@@ -255,6 +256,11 @@ public class DataflowPipelineTranslator {
     private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
 
     /**
+     * A Map from {@link PValue} to the {@link AppliedPTransform} that produces that {@link
PValue}.
+     */
+    private final Map<PValue, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
+
+    /**
      * A Map from PValues to their output names used by their producer
      * Dataflow steps.
      */
@@ -421,15 +427,11 @@ public class DataflowPipelineTranslator {
 
     @Override
     public void visitValue(PValue value, TransformHierarchy.Node producer) {
+      producers.put(value, producer.toAppliedPTransform());
       LOG.debug("Checking translation of {}", value);
-      if (value.getProducingTransformInternal() == null) {
-        throw new RuntimeException(
-            "internal error: expecting a PValue "
-            + "to have a producingTransform");
-      }
       if (!producer.isCompositeNode()) {
         // Primitive transforms are the only ones assigned step names.
-        asOutputReference(value);
+        asOutputReference(value, producer.toAppliedPTransform());
       }
     }
 
@@ -500,24 +502,25 @@ public class DataflowPipelineTranslator {
       return step;
     }
 
-    @Override
-    public OutputReference asOutputReference(PValue value) {
-      AppliedPTransform<?, ?, ?> transform =
-          value.getProducingTransformInternal();
-      String stepName = stepNames.get(transform);
-      if (stepName == null) {
-        throw new IllegalArgumentException(transform + " doesn't have a name specified");
-      }
+    public OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?>
producer) {
+      String stepName = stepNames.get(producer);
+      checkArgument(stepName != null, "%s doesn't have a name specified", producer);
 
       String outputName = outputNames.get(value);
-      if (outputName == null) {
-        throw new IllegalArgumentException(
-            "output " + value + " doesn't have a name specified");
-      }
+      checkArgument(outputName != null, "output %s doesn't have a name specified", value);
 
       return new OutputReference(stepName, outputName);
     }
 
+    @Override
+    public AppliedPTransform<?, ?, ?> getProducer(PValue value) {
+      return checkNotNull(
+          producers.get(value),
+          "Unknown producer for value %s while translating step %s",
+          value,
+          currentTransform.getFullName());
+    }
+
     /**
      * Returns a fresh Dataflow step name.
      */
@@ -585,7 +588,8 @@ public class DataflowPipelineTranslator {
     @Override
     public void addInput(String name, PInput value) {
       if (value instanceof PValue) {
-        addInput(name, translator.asOutputReference((PValue) value));
+        PValue pvalue = (PValue) value;
+        addInput(name, translator.asOutputReference(pvalue, translator.getProducer(pvalue)));
       } else {
         throw new IllegalStateException("Input must be a PValue");
       }
@@ -707,9 +711,9 @@ public class DataflowPipelineTranslator {
               View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext
context) {
             StepTranslationContext stepContext =
                 context.addStep(transform, "CollectionToSingleton");
-            stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            stepContext.addCollectionToSingletonOutput(
-                context.getInput(transform), context.getOutput(transform));
+            PCollection<ElemT> input = context.getInput(transform);
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
+            stepContext.addCollectionToSingletonOutput(input, context.getOutput(transform));
           }
         });
 
@@ -748,19 +752,19 @@ public class DataflowPipelineTranslator {
         new TransformTranslator<Flatten.FlattenPCollectionList>() {
           @Override
           public void translate(
-              Flatten.FlattenPCollectionList transform,
-              TranslationContext context) {
+              Flatten.FlattenPCollectionList transform, TranslationContext context) {
             flattenHelper(transform, context);
           }
 
           private <T> void flattenHelper(
-              Flatten.FlattenPCollectionList<T> transform,
-              TranslationContext context) {
+              Flatten.FlattenPCollectionList<T> transform, TranslationContext context)
{
             StepTranslationContext stepContext = context.addStep(transform, "Flatten");
 
             List<OutputReference> inputs = new LinkedList<>();
             for (TaggedPValue input : context.getInputs(transform)) {
-              inputs.add(context.asOutputReference(input.getValue()));
+              inputs.add(
+                  context.asOutputReference(
+                      input.getValue(), context.getProducer(input.getValue())));
             }
             stepContext.addInput(PropertyNames.INPUTS, inputs);
             stepContext.addOutput(context.getOutput(transform));
@@ -778,7 +782,8 @@ public class DataflowPipelineTranslator {
           private <K1, K2, V> void groupByKeyAndSortValuesHelper(
               GroupByKeyAndSortValuesOnly<K1, K2, V> transform, TranslationContext
context) {
             StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
-            stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+            PCollection<KV<K1, KV<K2, V>>> input = context.getInput(transform);
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
             stepContext.addOutput(context.getOutput(transform));
             stepContext.addInput(PropertyNames.SORT_VALUES, true);
 
@@ -791,30 +796,26 @@ public class DataflowPipelineTranslator {
         GroupByKey.class,
         new TransformTranslator<GroupByKey>() {
           @Override
-          public void translate(
-              GroupByKey transform,
-              TranslationContext context) {
+          public void translate(GroupByKey transform, TranslationContext context) {
             groupByKeyHelper(transform, context);
           }
 
           private <K, V> void groupByKeyHelper(
-              GroupByKey<K, V> transform,
-              TranslationContext context) {
+              GroupByKey<K, V> transform, TranslationContext context) {
             StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
-            stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+            PCollection<KV<K, V>> input = context.getInput(transform);
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
             stepContext.addOutput(context.getOutput(transform));
 
-            WindowingStrategy<?, ?> windowingStrategy =
-                context.getInput(transform).getWindowingStrategy();
+            WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
             boolean isStreaming =
                 context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
             boolean disallowCombinerLifting =
                 !windowingStrategy.getWindowFn().isNonMerging()
-                || (isStreaming && !transform.fewKeys())
-                // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
-                || !(windowingStrategy.getTrigger() instanceof DefaultTrigger);
-            stepContext.addInput(
-                PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
+                    || (isStreaming && !transform.fewKeys())
+                    // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
+                    || !(windowingStrategy.getTrigger() instanceof DefaultTrigger);
+            stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
             stepContext.addInput(
                 PropertyNames.SERIALIZED_FN,
                 byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
@@ -888,18 +889,16 @@ public class DataflowPipelineTranslator {
             translateHelper(transform, context);
           }
 
-          private <T> void translateHelper(
-              Window.Bound<T> transform, TranslationContext context) {
+          private <T> void translateHelper(Window.Bound<T> transform, TranslationContext
context) {
             StepTranslationContext stepContext = context.addStep(transform, "Bucket");
-            stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+            PCollection<T> input = context.getInput(transform);
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
             stepContext.addOutput(context.getOutput(transform));
 
-            WindowingStrategy<?, ?> strategy =
-                context.getOutput(transform).getWindowingStrategy();
+            WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
             byte[] serializedBytes = serializeToByteArray(strategy);
             String serializedJson = byteArrayToJsonString(serializedBytes);
-            assert Arrays.equals(serializedBytes,
-                                 jsonStringToByteArray(serializedJson));
+            assert Arrays.equals(serializedBytes, jsonStringToByteArray(serializedJson));
             stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
           }
         });
@@ -929,7 +928,7 @@ public class DataflowPipelineTranslator {
     for (PCollectionView<?> view : sideInputs) {
       nonParInputs.put(
           view.getTagInternal().getId(),
-          context.asOutputReference(view));
+          context.asOutputReference(view, context.getProducer(view)));
     }
 
     stepContext.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);

http://git-wip-us.apache.org/repos/asf/beam/blob/b6863b1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index fcba9be..e5ed933 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -874,7 +874,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
         stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(
             overriddenTransform.getElementCoder()));
       }
-      stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+      PCollection<T> input = context.getInput(transform);
+      stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b6863b1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index 6a82672..e020e83 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.OutputReference;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
@@ -73,7 +74,12 @@ interface TransformTranslator<TransformT extends PTransform> {
      */
     Step addStep(PTransform<?, ? extends PValue> transform, Step step);
     /** Encode a PValue reference as an output reference. */
-    OutputReference asOutputReference(PValue value);
+    OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?> producer);
+
+    /**
+     * Get the {@link AppliedPTransform} that produced the provided {@link PValue}.
+     */
+    AppliedPTransform<?, ?, ?> getProducer(PValue value);
   }
 
   /** The interface for a {@link TransformTranslator} to build a Dataflow step. */
@@ -93,6 +99,11 @@ interface TransformTranslator<TransformT extends PTransform> {
     /**
      * Adds an input with the given name to this Dataflow step, coming from the specified
input
      * PValue.
+     *
+     * <p>The input {@link PValue} must have already been produced by a step earlier
in this {@link
+     * Pipeline}. If the input value has not yet been produced yet (either by a call to {@link
+     * StepTranslationContext#addOutput(PValue)} or within a call to {@link
+     * TranslationContext#addStep(PTransform, Step)}), this method will throw an exception.
      */
     void addInput(String name, PInput value);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b6863b1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 2ff1032..5d13c3e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -565,8 +565,11 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   private static class EmbeddedTranslator
       implements TransformTranslator<EmbeddedTransform> {
     @Override public void translate(EmbeddedTransform transform, TranslationContext context)
{
-      addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT,
-          context.asOutputReference(context.getInput(transform)));
+      PCollection<String> input = context.getInput(transform);
+      addObject(
+          transform.step.getProperties(),
+          PropertyNames.PARALLEL_INPUT,
+          context.asOutputReference(input, context.getProducer(input)));
       context.addStep(transform, transform.step);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b6863b1d/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
index 052a1f3..4c62972 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.values;
 
 import java.util.List;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
@@ -32,14 +31,6 @@ public interface PValue extends POutput, PInput {
   String getName();
 
   /**
-   * Returns the {@link AppliedPTransform} that this {@link PValue} is an output of.
-   *
-   * <p>For internal use only.
-   */
-  @Deprecated
-  AppliedPTransform<?, ?, ?> getProducingTransformInternal();
-
-  /**
    * {@inheritDoc}.
    *
    * <p>A {@link PValue} always expands into itself. Calling {@link #expand()} on a
PValue is almost


Mime
View raw message