beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [09/36] beam git commit: mr-runner: hack to get around that ViewAsXXX.expand() return wrong output PValue.
Date Thu, 07 Sep 2017 18:39:18 GMT
mr-runner: hack to get around that ViewAsXXX.expand() return wrong output PValue.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5905efd3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5905efd3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5905efd3

Branch: refs/heads/mr-runner
Commit: 5905efd3364f2cd27567126508576aac887a1f63
Parents: 98da2a2
Author: Pei He <pei@apache.org>
Authored: Wed Aug 2 21:59:21 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Thu Aug 31 14:13:48 2017 +0800

----------------------------------------------------------------------
 .../translation/TranslationContext.java         | 54 +++++++++++++++-----
 1 file changed, 42 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5905efd3/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
index 2b51df5..365bdc0 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
@@ -22,13 +22,17 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -82,6 +86,11 @@ public class TranslationContext {
       this.currentNode = node;
       for (Map.Entry<TupleTag<?>, PValue> entry : currentNode.getOutputs().entrySet())
{
         pValueToTupleTag.put(entry.getValue(), entry.getKey());
+        // TODO: this is a hack to get around that ViewAsXXX.expand() return wrong output
PValue.
+        if (node.getTransform() instanceof View.CreatePCollectionView) {
+          View.CreatePCollectionView view = (View.CreatePCollectionView) node.getTransform();
+          pValueToTupleTag.put(view.getView(), view.getView().getTagInternal());
+        }
       }
     }
 
@@ -98,29 +107,50 @@ public class TranslationContext {
     }
 
     public List<Graphs.Tag> getInputTags() {
-      return FluentIterable.from(currentNode.getInputs().values())
+      Iterable<PValue> inputs;
+      if (currentNode.getTransform() instanceof ParDo.MultiOutput) {
+        ParDo.MultiOutput parDo = (ParDo.MultiOutput) currentNode.getTransform();
+        inputs = ImmutableList.<PValue>builder()
+            .add(getInput()).addAll(parDo.getSideInputs()).build();
+      } else {
+        inputs = currentNode.getInputs().values();
+      }
+      return FluentIterable.from(inputs)
           .transform(new Function<PValue, Graphs.Tag>() {
             @Override
             public Graphs.Tag apply(PValue pValue) {
               checkState(
                   pValueToTupleTag.containsKey(pValue),
                   String.format("Failed to find TupleTag for pValue: %s.", pValue));
-              PCollection<?> pc = (PCollection<?>) pValue;
-              return Graphs.Tag.of(
-                  pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder());
+              if (pValue instanceof PCollection) {
+                PCollection<?> pc = (PCollection<?>) pValue;
+                return Graphs.Tag.of(
+                    pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder());
+              } else {
+                return Graphs.Tag.of(
+                    pValue.getName(),
+                    pValueToTupleTag.get(pValue),
+                    ((PCollectionView) pValue).getCoderInternal());
+              }
             }})
           .toList();
     }
 
     public List<Graphs.Tag> getOutputTags() {
-      return FluentIterable.from(currentNode.getOutputs().entrySet())
-          .transform(new Function<Map.Entry<TupleTag<?>, PValue>, Graphs.Tag>()
{
-            @Override
-            public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) {
-              PCollection<?> pc = (PCollection<?>) entry.getValue();
-              return Graphs.Tag.of(pc.getName(), entry.getKey(), pc.getCoder());
-            }})
-          .toList();
+      if (currentNode.getTransform() instanceof View.CreatePCollectionView) {
+        PCollectionView view = ((View.CreatePCollectionView) currentNode.getTransform()).getView();
+        return ImmutableList.of(
+            Graphs.Tag.of(view.getName(), view.getTagInternal(), view.getCoderInternal()));
+      } else {
+        return FluentIterable.from(currentNode.getOutputs().entrySet())
+            .transform(new Function<Map.Entry<TupleTag<?>, PValue>, Graphs.Tag>()
{
+              @Override
+              public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) {
+                PCollection<?> pc = (PCollection<?>) entry.getValue();
+                return Graphs.Tag.of(pc.getName(), entry.getKey(), pc.getCoder());
+              }})
+            .toList();
+      }
     }
 
     public TupleTag<?> getOnlyOutputTag() {


Mime
View raw message