beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] beam git commit: [BEAM-1737] Implement a Single-output ParDo as a Multi-output ParDo with a single output.
Date Sun, 09 Apr 2017 08:04:43 GMT
Repository: beam
Updated Branches:
  refs/heads/master 810db7f99 -> a0cfccda4


[BEAM-1737] Implement a Single-output ParDo as a Multi-output ParDo with a single output.

remove use of EvaluationContext in DStream lambda, it is not serializable and also redundant
in this
case.

implement pardo as multido.

cache only if this is an actual MultiDo.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e294dc0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e294dc0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e294dc0

Branch: refs/heads/master
Commit: 9e294dc05c1f1ac2aa2ac15f364394698a2767a7
Parents: 810db7f
Author: Amit Sela <amitsela33@gmail.com>
Authored: Tue Apr 4 11:46:38 2017 +0300
Committer: Amit Sela <amitsela33@gmail.com>
Committed: Sun Apr 9 10:05:31 2017 +0300

----------------------------------------------------------------------
 .../runners/spark/translation/DoFnFunction.java | 130 -------------------
 .../spark/translation/TransformTranslator.java  |  74 ++++-------
 .../streaming/StreamingTransformTranslator.java |  78 +++--------
 3 files changed, 47 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9e294dc0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
deleted file mode 100644
index 11761b6..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-import org.apache.beam.runners.spark.aggregators.SparkAggregators;
-import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
-import org.apache.beam.runners.spark.util.SideInputBroadcast;
-import org.apache.beam.runners.spark.util.SparkSideInputReader;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.spark.Accumulator;
-import org.apache.spark.api.java.function.FlatMapFunction;
-
-
-/**
- * Beam's Do functions correspond to Spark's FlatMap functions.
- *
- * @param <InputT>  Input element type.
- * @param <OutputT> Output element type.
- */
-public class DoFnFunction<InputT, OutputT>
-    implements FlatMapFunction<Iterator<WindowedValue<InputT>>, WindowedValue<OutputT>>
{
-
-  private final Accumulator<NamedAggregators> aggregatorsAccum;
-  private final Accumulator<SparkMetricsContainer> metricsAccum;
-  private final String stepName;
-  private final DoFn<InputT, OutputT> doFn;
-  private final SparkRuntimeContext runtimeContext;
-  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
sideInputs;
-  private final WindowingStrategy<?, ?> windowingStrategy;
-
-  /**
-   * @param aggregatorsAccum  The Spark {@link Accumulator} that backs the Beam Aggregators.
-   * @param doFn              The {@link DoFn} to be wrapped.
-   * @param runtimeContext    The {@link SparkRuntimeContext}.
-   * @param sideInputs        Side inputs used in this {@link DoFn}.
-   * @param windowingStrategy Input {@link WindowingStrategy}.
-   */
-  public DoFnFunction(
-      Accumulator<NamedAggregators> aggregatorsAccum,
-      Accumulator<SparkMetricsContainer> metricsAccum,
-      String stepName,
-      DoFn<InputT, OutputT> doFn,
-      SparkRuntimeContext runtimeContext,
-      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
sideInputs,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    this.aggregatorsAccum = aggregatorsAccum;
-    this.metricsAccum = metricsAccum;
-    this.stepName = stepName;
-    this.doFn = doFn;
-    this.runtimeContext = runtimeContext;
-    this.sideInputs = sideInputs;
-    this.windowingStrategy = windowingStrategy;
-  }
-
-  @Override
-  public Iterable<WindowedValue<OutputT>> call(
-      Iterator<WindowedValue<InputT>> iter) throws Exception {
-    DoFnOutputManager outputManager = new DoFnOutputManager();
-
-    DoFnRunner<InputT, OutputT> doFnRunner =
-        DoFnRunners.simpleRunner(
-            runtimeContext.getPipelineOptions(),
-            doFn,
-            new SparkSideInputReader(sideInputs),
-            outputManager,
-            new TupleTag<OutputT>() {
-            },
-            Collections.<TupleTag<?>>emptyList(),
-            new SparkProcessContext.NoOpStepContext(),
-            new SparkAggregators.Factory(runtimeContext, aggregatorsAccum),
-            windowingStrategy);
-
-    DoFnRunner<InputT, OutputT> doFnRunnerWithMetrics =
-        new DoFnRunnerWithMetrics<>(stepName, doFnRunner, metricsAccum);
-
-    return new SparkProcessContext<>(doFn, doFnRunnerWithMetrics, outputManager)
-        .processPartition(iter);
-  }
-
-  private class DoFnOutputManager
-      implements SparkProcessContext.SparkOutputManager<WindowedValue<OutputT>>
{
-
-    private final List<WindowedValue<OutputT>> outputs = new LinkedList<>();
-
-    @Override
-    public void clear() {
-      outputs.clear();
-    }
-
-    @Override
-    public Iterator<WindowedValue<OutputT>> iterator() {
-      return outputs.iterator();
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public synchronized <T> void output(TupleTag<T> tag, WindowedValue<T>
output) {
-      outputs.add((WindowedValue<OutputT>) output);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/9e294dc0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 6290bba..7894c4e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -24,7 +24,6 @@ import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectS
 import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.util.Collections;
@@ -334,8 +333,7 @@ public final class TransformTranslator {
     };
   }
 
-  private static <InputT, OutputT> TransformEvaluator<ParDo.MultiOutput<InputT,
OutputT>>
-  parDo() {
+  private static <InputT, OutputT> TransformEvaluator<ParDo.MultiOutput<InputT,
OutputT>> parDo() {
     return new TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>>() {
       @Override
       public void evaluate(
@@ -351,51 +349,31 @@ public final class TransformTranslator {
             context.getInput(transform).getWindowingStrategy();
         Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance();
         Accumulator<SparkMetricsContainer> metricsAccum = MetricsAccumulator.getInstance();
-        Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
sideInputs =
-            TranslationUtils.getSideInputs(transform.getSideInputs(), context);
-        if (transform.getSideOutputTags().size() == 0) {
-          // Don't tag with the output and filter for a single-output ParDo, as it's additional
-          // identity transforms.
-          // Also see BEAM-1737 for failures when the two versions are condensed.
-          PCollection<OutputT> output =
-              (PCollection<OutputT>)
-                  Iterables.getOnlyElement(context.getOutputs(transform)).getValue();
-          context.putDataset(
-              output,
-              new BoundedDataset<>(
-                  inRDD.mapPartitions(
-                      new DoFnFunction<>(
-                          aggAccum,
-                          metricsAccum,
-                          stepName,
-                          doFn,
-                          context.getRuntimeContext(),
-                          sideInputs,
-                          windowingStrategy))));
-        } else {
-          JavaPairRDD<TupleTag<?>, WindowedValue<?>> all =
-              inRDD
-                  .mapPartitionsToPair(
-                      new MultiDoFnFunction<>(
-                          aggAccum,
-                          metricsAccum,
-                          stepName,
-                          doFn,
-                          context.getRuntimeContext(),
-                          transform.getMainOutputTag(),
-                          TranslationUtils.getSideInputs(transform.getSideInputs(), context),
-                          windowingStrategy))
-                  .cache();
-          for (TaggedPValue output : context.getOutputs(transform)) {
-            @SuppressWarnings("unchecked")
-            JavaPairRDD<TupleTag<?>, WindowedValue<?>> filtered =
-                all.filter(new TranslationUtils.TupleTagFilter(output.getTag()));
-            @SuppressWarnings("unchecked")
-            // Object is the best we can do since different outputs can have different tags
-            JavaRDD<WindowedValue<Object>> values =
-                (JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values();
-            context.putDataset(output.getValue(), new BoundedDataset<>(values));
-          }
+        JavaPairRDD<TupleTag<?>, WindowedValue<?>> all =
+            inRDD.mapPartitionsToPair(
+                new MultiDoFnFunction<>(
+                    aggAccum,
+                    metricsAccum,
+                    stepName,
+                    doFn,
+                    context.getRuntimeContext(),
+                    transform.getMainOutputTag(),
+                    TranslationUtils.getSideInputs(transform.getSideInputs(), context),
+                    windowingStrategy));
+        List<TaggedPValue> outputs = context.getOutputs(transform);
+        if (outputs.size() > 1) {
+          // cache the RDD if we're going to filter it more than once.
+          all.cache();
+        }
+        for (TaggedPValue output : outputs) {
+          @SuppressWarnings("unchecked")
+          JavaPairRDD<TupleTag<?>, WindowedValue<?>> filtered =
+              all.filter(new TranslationUtils.TupleTagFilter(output.getTag()));
+          @SuppressWarnings("unchecked")
+          // Object is the best we can do since different outputs can have different tags
+          JavaRDD<WindowedValue<Object>> values =
+              (JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values();
+          context.putDataset(output.getValue(), new BoundedDataset<>(values));
         }
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9e294dc0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 2d2854f..d4c6c9d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -43,7 +43,6 @@ import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
 import org.apache.beam.runners.spark.translation.BoundedDataset;
 import org.apache.beam.runners.spark.translation.Dataset;
-import org.apache.beam.runners.spark.translation.DoFnFunction;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
 import org.apache.beam.runners.spark.translation.MultiDoFnFunction;
@@ -368,8 +367,7 @@ public final class StreamingTransformTranslator {
     };
   }
 
-  private static <InputT, OutputT> TransformEvaluator<ParDo.MultiOutput<InputT,
OutputT>>
-  multiDo() {
+  private static <InputT, OutputT> TransformEvaluator<ParDo.MultiOutput<InputT,
OutputT>> parDo() {
     return new TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>>() {
       public void evaluate(
           final ParDo.MultiOutput<InputT, OutputT> transform, final EvaluationContext
context) {
@@ -388,16 +386,14 @@ public final class StreamingTransformTranslator {
 
         final String stepName = context.getCurrentTransform().getFullName();
         if (transform.getSideOutputTags().size() == 0) {
-          // Don't tag with the output and filter for a single-output ParDo, as it's additional
-          // identity transforms.
-          // Also see BEAM-1737 for failures when the two versions are condensed.
-          JavaDStream<WindowedValue<OutputT>> outStream =
-              dStream.transform(
-                  new Function<JavaRDD<WindowedValue<InputT>>, JavaRDD<WindowedValue<OutputT>>>()
{
+          JavaPairDStream<TupleTag<?>, WindowedValue<?>> all =
+              dStream.transformToPair(
+                  new Function<
+                      JavaRDD<WindowedValue<InputT>>,
+                      JavaPairRDD<TupleTag<?>, WindowedValue<?>>>()
{
                     @Override
-                    public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>>
rdd)
-                        throws Exception {
-                      final JavaSparkContext jsc = new JavaSparkContext(rdd.context());
+                    public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call(
+                        JavaRDD<WindowedValue<InputT>> rdd) throws Exception
{
                       final Accumulator<NamedAggregators> aggAccum =
                           AggregatorsAccumulator.getInstance();
                       final Accumulator<SparkMetricsContainer> metricsAccum =
@@ -405,59 +401,27 @@ public final class StreamingTransformTranslator {
                       final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>,
SideInputBroadcast<?>>>
                           sideInputs =
                               TranslationUtils.getSideInputs(
-                                  transform.getSideInputs(), jsc, pviews);
-                      return rdd.mapPartitions(
-                          new DoFnFunction<>(
+                                  transform.getSideInputs(),
+                                  JavaSparkContext.fromSparkContext(rdd.context()),
+                                  pviews);
+                      return rdd.mapPartitionsToPair(
+                          new MultiDoFnFunction<>(
                               aggAccum,
                               metricsAccum,
                               stepName,
                               doFn,
                               runtimeContext,
+                              transform.getMainOutputTag(),
                               sideInputs,
                               windowingStrategy));
                     }
                   });
-
-          PCollection<OutputT> output =
-              (PCollection<OutputT>)
-                  Iterables.getOnlyElement(context.getOutputs(transform)).getValue();
-          context.putDataset(
-              output, new UnboundedDataset<>(outStream, unboundedDataset.getStreamSources()));
-        } else {
-          JavaPairDStream<TupleTag<?>, WindowedValue<?>> all =
-              dStream
-                  .transformToPair(
-                      new Function<
-                          JavaRDD<WindowedValue<InputT>>,
-                          JavaPairRDD<TupleTag<?>, WindowedValue<?>>>()
{
-                        @Override
-                        public JavaPairRDD<TupleTag<?>, WindowedValue<?>>
call(
-                            JavaRDD<WindowedValue<InputT>> rdd) throws Exception
{
-                          String stepName = context.getCurrentTransform().getFullName();
-                          final Accumulator<NamedAggregators> aggAccum =
-                              AggregatorsAccumulator.getInstance();
-                          final Accumulator<SparkMetricsContainer> metricsAccum =
-                              MetricsAccumulator.getInstance();
-                          final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>,
SideInputBroadcast<?>>>
-                              sideInputs =
-                                  TranslationUtils.getSideInputs(
-                                      transform.getSideInputs(),
-                                      JavaSparkContext.fromSparkContext(rdd.context()),
-                                      pviews);
-                          return rdd.mapPartitionsToPair(
-                              new MultiDoFnFunction<>(
-                                  aggAccum,
-                                  metricsAccum,
-                                  stepName,
-                                  doFn,
-                                  runtimeContext,
-                                  transform.getMainOutputTag(),
-                                  sideInputs,
-                                  windowingStrategy));
-                        }
-                      })
-                  .cache();
-          for (TaggedPValue output : context.getOutputs(transform)) {
+          List<TaggedPValue> outputs = context.getOutputs(transform);
+          if (outputs.size() > 1) {
+            // cache the DStream if we're going to filter it more than once.
+            all.cache();
+          }
+          for (TaggedPValue output : outputs) {
             @SuppressWarnings("unchecked")
             JavaPairDStream<TupleTag<?>, WindowedValue<?>> filtered =
                 all.filter(new TranslationUtils.TupleTagFilter(output.getTag()));
@@ -525,7 +489,7 @@ public final class StreamingTransformTranslator {
     EVALUATORS.put(Read.Unbounded.class, readUnbounded());
     EVALUATORS.put(GroupByKey.class, groupByKey());
     EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
-    EVALUATORS.put(ParDo.MultiOutput.class, multiDo());
+    EVALUATORS.put(ParDo.MultiOutput.class, parDo());
     EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
     EVALUATORS.put(CreateStream.class, createFromQueue());
     EVALUATORS.put(Window.Assign.class, window());


Mime
View raw message