beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/3] beam git commit: [BEAM-831] ParDo Fusion of Apex Runner
Date Mon, 08 May 2017 04:58:46 GMT
Repository: beam
Updated Branches:
  refs/heads/master 53dd0a529 -> 1a4ca9acd


[BEAM-831] ParDo Fusion of Apex Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/55a7cd4f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/55a7cd4f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/55a7cd4f

Branch: refs/heads/master
Commit: 55a7cd4f97f2e3fee709f034a0748284c9030df7
Parents: 019d300
Author: chinmaykolhatkar <chinmay@apache.org>
Authored: Wed Mar 1 16:59:46 2017 +0530
Committer: Thomas Weise <thw@apache.org>
Committed: Sat May 6 19:52:13 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/apex/ApexPipelineOptions.java  |   5 +
 .../apex/translation/TranslationContext.java    | 103 +++++++++++++++++--
 2 files changed, 97 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/55a7cd4f/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
index f37e874..92f6e8f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -62,4 +62,9 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab
   @Default.String("classpath:/beam-runners-apex.properties")
   String getConfigFile();
 
+  @Description("configure whether to perform ParDo fusion")
+  void setParDoFusionEnabled(boolean enabled);
+
+  @Default.Boolean(true)
+  boolean isParDoFusionEnabled();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/55a7cd4f/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index 9c20449..1224e25 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -37,6 +37,8 @@ import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -54,7 +56,8 @@ class TranslationContext {
 
   private final ApexPipelineOptions pipelineOptions;
   private AppliedPTransform<?, ?, ?> currentTransform;
-  private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>>
streams = new HashMap<>();
+  private final Map<PCollection, Pair<OutputPortInfo, List<InputPortInfo>>>
streams =
+          new HashMap<>();
   private final Map<String, Operator> operators = new HashMap<>();
   private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>();
   private Map<PInput, PInput> aliasCollections = new HashMap<>();
@@ -122,8 +125,10 @@ class TranslationContext {
         addOperator(operator, portEntry.getValue(), portEntry.getKey());
         first = false;
       } else {
-        this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(),
-            new ArrayList<>()));
+        this.streams.put(portEntry.getKey(),
+                         (Pair) new ImmutablePair<>(new OutputPortInfo(portEntry.getValue(),
+                                                                       getCurrentTransform()),
+                                                    new ArrayList<>()));
       }
     }
   }
@@ -142,16 +147,19 @@ class TranslationContext {
       name = getCurrentTransform().getFullName() + i;
     }
     this.operators.put(name, operator);
-    this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>()));
+    this.streams.put(output, (Pair) new ImmutablePair<>(
+            new OutputPortInfo(port, getCurrentTransform()),
+            new ArrayList<>()));
   }
 
   public void addStream(PInput input, InputPort inputPort) {
     while (aliasCollections.containsKey(input)) {
       input = aliasCollections.get(input);
     }
-    Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input);
-    checkArgument(stream != null, "no upstream operator defined for %s", input);
-    stream.getRight().add(inputPort);
+
+    Pair<OutputPortInfo, List<InputPortInfo>> stream = this.streams.get(input);
+    checkArgument(stream != null, "no upstream operator defined for " + input);
+    stream.getRight().add(new InputPortInfo(inputPort, getCurrentTransform()));
   }
 
   /**
@@ -168,13 +176,23 @@ class TranslationContext {
     for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) {
       dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue());
     }
+
     int streamIndex = 0;
-    for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>>
streamEntry : this.
+    for (Map.Entry<PCollection, Pair<OutputPortInfo, List<InputPortInfo>>>
streamEntry : this.
         streams.entrySet()) {
-      List<InputPort<?>> sinksList = streamEntry.getValue().getRight();
-      InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]);
+      List<InputPortInfo> destInfo = streamEntry.getValue().getRight();
+      InputPort[] sinks = new InputPort[destInfo.size()];
+      for (int i = 0; i < sinks.length; i++) {
+        sinks[i] = destInfo.get(i).port;
+      }
+
       if (sinks.length > 0) {
-        dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks);
+        DAG.StreamMeta streamMeta = dag.addStream("stream" + streamIndex++,
+                                                  streamEntry.getValue().getLeft().port,
sinks);
+        if (pipelineOptions.isParDoFusionEnabled()) {
+          optimizeStreams(streamMeta, streamEntry);
+        }
+
         for (InputPort port : sinks) {
           PCollection pc = streamEntry.getKey();
           Coder coder = pc.getCoder();
@@ -191,6 +209,49 @@ class TranslationContext {
     }
   }
 
+  private void optimizeStreams(DAG.StreamMeta streamMeta,
+                               Map.Entry<PCollection,
+                                         Pair<OutputPortInfo, List<InputPortInfo>>>
streamEntry) {
+    DAG.Locality loc = null;
+
+    List<InputPortInfo> sinks = streamEntry.getValue().getRight();
+    OutputPortInfo source = streamEntry.getValue().getLeft();
+    PTransform sourceTransform = source.transform.getTransform();
+    if (sourceTransform instanceof ParDo.Bound
+        || sourceTransform instanceof ParDo.BoundMulti) {
+      // Source is ParDo.. Check sink(s)
+      for (InputPortInfo sink : sinks) {
+        PTransform transform = sink.transform.getTransform();
+        if (transform instanceof ParDo.Bound) {
+          ParDo.Bound t = (ParDo.Bound) transform;
+          if (t.getSideInputs().size() > 0) {
+            loc = DAG.Locality.CONTAINER_LOCAL;
+            break;
+          } else {
+            loc = DAG.Locality.THREAD_LOCAL;
+          }
+        } else if (transform instanceof ParDo.BoundMulti) {
+          ParDo.BoundMulti t = (ParDo.BoundMulti) transform;
+          if (t.getSideInputs().size() > 0) {
+            loc = DAG.Locality.CONTAINER_LOCAL;
+            break;
+          } else {
+            loc = DAG.Locality.THREAD_LOCAL;
+          }
+        } else {
+          // Sink is not ParDo.. set null locality.
+          loc = null;
+          break;
+        }
+      }
+    } else {
+      // Source is not ParDo... set null locality
+      loc = null;
+    }
+
+    streamMeta.setLocality(loc);
+  }
+
   /**
    * Return the state backend for the pipeline translation.
    * @return
@@ -198,4 +259,24 @@ class TranslationContext {
   public ApexStateBackend getStateBackend() {
     return new ApexStateInternals.ApexStateBackend();
   }
+
+  static class InputPortInfo {
+    InputPort port;
+    AppliedPTransform transform;
+
+    public InputPortInfo(InputPort port, AppliedPTransform transform) {
+      this.port = port;
+      this.transform = transform;
+    }
+  }
+
+  static class OutputPortInfo {
+    OutputPort port;
+    AppliedPTransform transform;
+
+    public OutputPortInfo(OutputPort port, AppliedPTransform transform) {
+      this.port = port;
+      this.transform = transform;
+    }
+  }
 }


Mime
View raw message