beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [19/23] incubator-beam git commit: [BEAM-11] second iteration of package reorganisation
Date Tue, 15 Mar 2016 18:48:16 GMT
[BEAM-11] second iteration of package reorganisation


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eb0341d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eb0341d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eb0341d4

Branch: refs/heads/master
Commit: eb0341d4df6a14d8d815ebfc51ffc0254923a8fa
Parents: 3980f44
Author: Sela <ansela@paypal.com>
Authored: Mon Mar 14 23:48:05 2016 +0200
Committer: Sela <ansela@paypal.com>
Committed: Tue Mar 15 20:40:18 2016 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/spark/DoFnFunction.java |  97 ---
 .../beam/runners/spark/EvaluationContext.java   | 287 -------
 .../beam/runners/spark/MultiDoFnFunction.java   | 119 ---
 .../beam/runners/spark/SparkContextFactory.java |  69 --
 .../runners/spark/SparkPipelineEvaluator.java   |  55 --
 .../spark/SparkPipelineOptionsFactory.java      |  30 -
 .../spark/SparkPipelineOptionsRegistrar.java    |  30 -
 .../beam/runners/spark/SparkPipelineRunner.java |   8 +-
 .../spark/SparkPipelineRunnerRegistrar.java     |  30 -
 .../runners/spark/SparkPipelineTranslator.java  |  30 -
 .../beam/runners/spark/SparkProcessContext.java | 262 ------
 .../beam/runners/spark/SparkRuntimeContext.java | 217 -----
 .../spark/SparkStreamingPipelineOptions.java    |  41 +
 .../beam/runners/spark/TransformEvaluator.java  |  27 -
 .../beam/runners/spark/TransformTranslator.java | 808 -------------------
 .../beam/runners/spark/WindowingHelpers.java    |  62 --
 .../spark/aggregators/NamedAggregators.java     |   2 +-
 .../SparkStreamingPipelineOptions.java          |  43 -
 .../SparkStreamingPipelineOptionsFactory.java   |  30 -
 .../SparkStreamingPipelineOptionsRegistrar.java |  31 -
 .../streaming/StreamingEvaluationContext.java   | 229 ------
 .../streaming/StreamingTransformTranslator.java | 418 ----------
 .../StreamingWindowPipelineDetector.java        | 104 ---
 .../runners/spark/translation/DoFnFunction.java |  97 +++
 .../spark/translation/EvaluationContext.java    | 288 +++++++
 .../spark/translation/MultiDoFnFunction.java    | 119 +++
 .../spark/translation/SparkContextFactory.java  |  69 ++
 .../translation/SparkPipelineEvaluator.java     |  56 ++
 .../SparkPipelineOptionsFactory.java            |  31 +
 .../SparkPipelineOptionsRegistrar.java          |  31 +
 .../SparkPipelineRunnerRegistrar.java           |  31 +
 .../translation/SparkPipelineTranslator.java    |  30 +
 .../spark/translation/SparkProcessContext.java  | 262 ++++++
 .../spark/translation/SparkRuntimeContext.java  | 217 +++++
 .../spark/translation/TransformEvaluator.java   |  27 +
 .../spark/translation/TransformTranslator.java  | 808 +++++++++++++++++++
 .../spark/translation/WindowingHelpers.java     |  62 ++
 .../SparkStreamingPipelineOptionsFactory.java   |  31 +
 .../SparkStreamingPipelineOptionsRegistrar.java |  32 +
 .../streaming/StreamingEvaluationContext.java   | 229 ++++++
 .../streaming/StreamingTransformTranslator.java | 418 ++++++++++
 .../StreamingWindowPipelineDetector.java        | 104 +++
 ...ataflow.sdk.options.PipelineOptionsRegistrar |   4 +-
 ...dataflow.sdk.runners.PipelineRunnerRegistrar |   2 +-
 .../beam/runners/spark/CombineGloballyTest.java |  91 ---
 .../beam/runners/spark/CombinePerKeyTest.java   |  68 --
 .../apache/beam/runners/spark/DeDupTest.java    |   1 +
 .../beam/runners/spark/DoFnOutputTest.java      |  61 --
 .../beam/runners/spark/EmptyInputTest.java      |   1 +
 .../runners/spark/MultiOutputWordCountTest.java | 135 ----
 .../beam/runners/spark/SerializationTest.java   | 180 -----
 .../beam/runners/spark/SideEffectsTest.java     |  79 --
 .../beam/runners/spark/SimpleWordCountTest.java |   1 +
 .../spark/TestSparkPipelineOptionsFactory.java  |  37 -
 .../runners/spark/TransformTranslatorTest.java  |  98 ---
 .../runners/spark/WindowedWordCountTest.java    |  67 --
 .../beam/runners/spark/io/NumShardsTest.java    |   2 +-
 .../spark/streaming/FlattenStreamingTest.java   |  87 --
 .../spark/streaming/KafkaStreamingTest.java     | 139 ----
 .../streaming/SimpleStreamingWordCountTest.java |  76 --
 .../utils/DataflowAssertStreaming.java          |  42 -
 .../streaming/utils/EmbeddedKafkaCluster.java   | 317 --------
 .../spark/translation/CombineGloballyTest.java  |  94 +++
 .../spark/translation/CombinePerKeyTest.java    |  70 ++
 .../spark/translation/DoFnOutputTest.java       |  64 ++
 .../translation/MultiOutputWordCountTest.java   | 137 ++++
 .../spark/translation/SerializationTest.java    | 183 +++++
 .../spark/translation/SideEffectsTest.java      |  81 ++
 .../TestSparkPipelineOptionsFactory.java        |  38 +
 .../translation/TransformTranslatorTest.java    |  99 +++
 .../translation/WindowedWordCountTest.java      |  71 ++
 .../streaming/FlattenStreamingTest.java         |  88 ++
 .../streaming/KafkaStreamingTest.java           | 140 ++++
 .../streaming/SimpleStreamingWordCountTest.java |  77 ++
 .../utils/DataflowAssertStreaming.java          |  42 +
 .../streaming/utils/EmbeddedKafkaCluster.java   | 317 ++++++++
 76 files changed, 4496 insertions(+), 4464 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java
deleted file mode 100644
index e5d4542..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.apache.spark.api.java.function.FlatMapFunction;
-
-/**
- * Dataflow's Do functions correspond to Spark's FlatMap functions.
- *
- * @param <I> Input element type.
- * @param <O> Output element type.
- */
-public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValue<I>>,
-    WindowedValue<O>> {
-  private final DoFn<I, O> mFunction;
-  private final SparkRuntimeContext mRuntimeContext;
-  private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
-
-  /**
-   * @param fn         DoFunction to be wrapped.
-   * @param runtime    Runtime to apply function in.
-   * @param sideInputs Side inputs used in DoFunction.
-   */
-  public DoFnFunction(DoFn<I, O> fn,
-               SparkRuntimeContext runtime,
-               Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
-    this.mFunction = fn;
-    this.mRuntimeContext = runtime;
-    this.mSideInputs = sideInputs;
-  }
-
-  @Override
-  public Iterable<WindowedValue<O>> call(Iterator<WindowedValue<I>> iter) throws
-      Exception {
-    ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
-    ctxt.setup();
-    mFunction.startBundle(ctxt);
-    return ctxt.getOutputIterable(iter, mFunction);
-  }
-
-  private class ProcCtxt extends SparkProcessContext<I, O, WindowedValue<O>> {
-
-    private final List<WindowedValue<O>> outputs = new LinkedList<>();
-
-    ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
-        BroadcastHelper<?>> sideInputs) {
-      super(fn, runtimeContext, sideInputs);
-    }
-
-    @Override
-    public synchronized void output(O o) {
-      outputs.add(windowedValue != null ? windowedValue.withValue(o) :
-          WindowedValue.valueInEmptyWindows(o));
-    }
-
-    @Override
-    public synchronized void output(WindowedValue<O> o) {
-      outputs.add(o);
-    }
-
-    @Override
-    protected void clearOutput() {
-      outputs.clear();
-    }
-
-    @Override
-    protected Iterator<WindowedValue<O>> getOutputIterator() {
-      return outputs.iterator();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationContext.java
deleted file mode 100644
index ad49528..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationContext.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
-import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaSparkContext;
-
-
-/**
- * Evaluation context allows us to define how pipeline instructions.
- */
-public class EvaluationContext implements EvaluationResult {
-  private final JavaSparkContext jsc;
-  private final Pipeline pipeline;
-  private final SparkRuntimeContext runtime;
-  private final Map<PValue, RDDHolder<?>> pcollections = new LinkedHashMap<>();
-  private final Set<RDDHolder<?>> leafRdds = new LinkedHashSet<>();
-  private final Set<PValue> multireads = new LinkedHashSet<>();
-  private final Map<PValue, Object> pobjects = new LinkedHashMap<>();
-  private final Map<PValue, Iterable<? extends WindowedValue<?>>> pview = new LinkedHashMap<>();
-  protected AppliedPTransform<?, ?, ?> currentTransform;
-
-  public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
-    this.jsc = jsc;
-    this.pipeline = pipeline;
-    this.runtime = new SparkRuntimeContext(jsc, pipeline);
-  }
-
-  /**
-   * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are
-   * sometimes created from a collection of objects (using RDD parallelize) and then
-   * only used to create View objects; in which case they do not need to be
-   * converted to bytes since they are not transferred across the network until they are
-   * broadcast.
-   */
-  private class RDDHolder<T> {
-
-    private Iterable<T> values;
-    private Coder<T> coder;
-    private JavaRDDLike<WindowedValue<T>, ?> rdd;
-
-    RDDHolder(Iterable<T> values, Coder<T> coder) {
-      this.values = values;
-      this.coder = coder;
-    }
-
-    RDDHolder(JavaRDDLike<WindowedValue<T>, ?> rdd) {
-      this.rdd = rdd;
-    }
-
-    JavaRDDLike<WindowedValue<T>, ?> getRDD() {
-      if (rdd == null) {
-        Iterable<WindowedValue<T>> windowedValues = Iterables.transform(values,
-            new Function<T, WindowedValue<T>>() {
-            @Override
-            public WindowedValue<T> apply(T t) {
-             // TODO: this is wrong if T is a TimestampedValue
-              return WindowedValue.valueInEmptyWindows(t);
-            }
-        });
-        WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
-            WindowedValue.getValueOnlyCoder(coder);
-        rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
-            .map(CoderHelpers.fromByteFunction(windowCoder));
-      }
-      return rdd;
-    }
-
-    Iterable<T> getValues(PCollection<T> pcollection) {
-      if (values == null) {
-        coder = pcollection.getCoder();
-        JavaRDDLike<byte[], ?> bytesRDD = rdd.map(WindowingHelpers.<T>unwindowFunction())
-            .map(CoderHelpers.toByteFunction(coder));
-        List<byte[]> clientBytes = bytesRDD.collect();
-        values = Iterables.transform(clientBytes, new Function<byte[], T>() {
-          @Override
-          public T apply(byte[] bytes) {
-            return CoderHelpers.fromByteArray(bytes, coder);
-          }
-        });
-      }
-      return values;
-    }
-
-    Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
-      return Iterables.transform(get(pcollection), new Function<T, WindowedValue<T>>() {
-        @Override
-        public WindowedValue<T> apply(T t) {
-          return WindowedValue.valueInEmptyWindows(t); // TODO: not the right place?
-        }
-      });
-    }
-  }
-
-  protected JavaSparkContext getSparkContext() {
-    return jsc;
-  }
-
-  protected Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  protected SparkRuntimeContext getRuntimeContext() {
-    return runtime;
-  }
-
-  protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
-    this.currentTransform = transform;
-  }
-
-  protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
-    return currentTransform;
-  }
-
-  protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
-    checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
-        "can only be called with current transform");
-    @SuppressWarnings("unchecked")
-    I input = (I) currentTransform.getInput();
-    return input;
-  }
-
-  protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
-    checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
-        "can only be called with current transform");
-    @SuppressWarnings("unchecked")
-    O output = (O) currentTransform.getOutput();
-    return output;
-  }
-
-  protected  <T> void setOutputRDD(PTransform<?, ?> transform,
-      JavaRDDLike<WindowedValue<T>, ?> rdd) {
-    setRDD((PValue) getOutput(transform), rdd);
-  }
-
-  protected  <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
-      Coder<T> coder) {
-    pcollections.put((PValue) getOutput(transform), new RDDHolder<>(values, coder));
-  }
-
-  void setPView(PValue view, Iterable<? extends WindowedValue<?>> value) {
-    pview.put(view, value);
-  }
-
-  protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
-    PValue pvalue = (PValue) getOutput(transform);
-    return pcollections.containsKey(pvalue);
-  }
-
-  protected JavaRDDLike<?, ?> getRDD(PValue pvalue) {
-    RDDHolder<?> rddHolder = pcollections.get(pvalue);
-    JavaRDDLike<?, ?> rdd = rddHolder.getRDD();
-    leafRdds.remove(rddHolder);
-    if (multireads.contains(pvalue)) {
-      // Ensure the RDD is marked as cached
-      rdd.rdd().cache();
-    } else {
-      multireads.add(pvalue);
-    }
-    return rdd;
-  }
-
-  protected <T> void setRDD(PValue pvalue, JavaRDDLike<WindowedValue<T>, ?> rdd) {
-    try {
-      rdd.rdd().setName(pvalue.getName());
-    } catch (IllegalStateException e) {
-      // name not set, ignore
-    }
-    RDDHolder<T> rddHolder = new RDDHolder<>(rdd);
-    pcollections.put(pvalue, rddHolder);
-    leafRdds.add(rddHolder);
-  }
-
-  JavaRDDLike<?, ?> getInputRDD(PTransform<? extends PInput, ?> transform) {
-    return getRDD((PValue) getInput(transform));
-  }
-
-
-  <T> Iterable<? extends WindowedValue<?>> getPCollectionView(PCollectionView<T> view) {
-    return pview.get(view);
-  }
-
-  /**
-   * Computes the outputs for all RDDs that are leaves in the DAG and do not have any
-   * actions (like saving to a file) registered on them (i.e. they are performed for side
-   * effects).
-   */
-  protected void computeOutputs() {
-    for (RDDHolder<?> rddHolder : leafRdds) {
-      JavaRDDLike<?, ?> rdd = rddHolder.getRDD();
-      rdd.rdd().cache(); // cache so that any subsequent get() is cheap
-      rdd.count(); // force the RDD to be computed
-    }
-  }
-
-  @Override
-  public <T> T get(PValue value) {
-    if (pobjects.containsKey(value)) {
-      @SuppressWarnings("unchecked")
-      T result = (T) pobjects.get(value);
-      return result;
-    }
-    if (pcollections.containsKey(value)) {
-      JavaRDDLike<?, ?> rdd = pcollections.get(value).getRDD();
-      @SuppressWarnings("unchecked")
-      T res = (T) Iterables.getOnlyElement(rdd.collect());
-      pobjects.put(value, res);
-      return res;
-    }
-    throw new IllegalStateException("Cannot resolve un-known PObject: " + value);
-  }
-
-  @Override
-  public <T> T getAggregatorValue(String named, Class<T> resultType) {
-    return runtime.getAggregatorValue(named, resultType);
-  }
-
-  @Override
-  public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException {
-    return runtime.getAggregatorValues(aggregator);
-  }
-
-  @Override
-  public <T> Iterable<T> get(PCollection<T> pcollection) {
-    @SuppressWarnings("unchecked")
-    RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
-    return rddHolder.getValues(pcollection);
-  }
-
-  <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
-    @SuppressWarnings("unchecked")
-    RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
-    return rddHolder.getWindowedValues(pcollection);
-  }
-
-  @Override
-  public void close() {
-    SparkContextFactory.stopSparkContext(jsc);
-  }
-
-  /** The runner is blocking. */
-  @Override
-  public State getState() {
-    return State.DONE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/MultiDoFnFunction.java
deleted file mode 100644
index 47433a6..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/MultiDoFnFunction.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.joda.time.Instant;
-import scala.Tuple2;
-
-/**
- * DoFunctions ignore side outputs. MultiDoFunctions deal with side outputs by enriching the
- * underlying data with multiple TupleTags.
- *
- * @param <I> Input type for DoFunction.
- * @param <O> Output type for DoFunction.
- */
-class MultiDoFnFunction<I, O>
-    implements PairFlatMapFunction<Iterator<WindowedValue<I>>, TupleTag<?>, WindowedValue<?>> {
-  private final DoFn<I, O> mFunction;
-  private final SparkRuntimeContext mRuntimeContext;
-  private final TupleTag<O> mMainOutputTag;
-  private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
-
-  MultiDoFnFunction(
-      DoFn<I, O> fn,
-      SparkRuntimeContext runtimeContext,
-      TupleTag<O> mainOutputTag,
-      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
-    this.mFunction = fn;
-    this.mRuntimeContext = runtimeContext;
-    this.mMainOutputTag = mainOutputTag;
-    this.mSideInputs = sideInputs;
-  }
-
-  @Override
-  public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>>
-      call(Iterator<WindowedValue<I>> iter) throws Exception {
-    ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
-    mFunction.startBundle(ctxt);
-    ctxt.setup();
-    return ctxt.getOutputIterable(iter, mFunction);
-  }
-
-  private class ProcCtxt extends SparkProcessContext<I, O, Tuple2<TupleTag<?>, WindowedValue<?>>> {
-
-    private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();
-
-    ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
-        BroadcastHelper<?>> sideInputs) {
-      super(fn, runtimeContext, sideInputs);
-    }
-
-    @Override
-    public synchronized void output(O o) {
-      outputs.put(mMainOutputTag, windowedValue.withValue(o));
-    }
-
-    @Override
-    public synchronized void output(WindowedValue<O> o) {
-      outputs.put(mMainOutputTag, o);
-    }
-
-    @Override
-    public synchronized <T> void sideOutput(TupleTag<T> tag, T t) {
-      outputs.put(tag, windowedValue.withValue(t));
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
-      outputs.put(tupleTag, WindowedValue.of(t, instant,
-          windowedValue.getWindows(), windowedValue.getPane()));
-    }
-
-    @Override
-    protected void clearOutput() {
-      outputs.clear();
-    }
-
-    @Override
-    protected Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> getOutputIterator() {
-      return Iterators.transform(outputs.entries().iterator(),
-          new Function<Map.Entry<TupleTag<?>, WindowedValue<?>>,
-              Tuple2<TupleTag<?>, WindowedValue<?>>>() {
-        @Override
-        public Tuple2<TupleTag<?>, WindowedValue<?>> apply(Map.Entry<TupleTag<?>,
-            WindowedValue<?>> input) {
-          return new Tuple2<TupleTag<?>, WindowedValue<?>>(input.getKey(), input.getValue());
-        }
-      });
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextFactory.java
deleted file mode 100644
index 4393a75..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.serializer.KryoSerializer;
-
-final class SparkContextFactory {
-
-  /**
-   * If the property {@code dataflow.spark.test.reuseSparkContext} is set to
-   * {@code true} then the Spark context will be reused for dataflow pipelines.
-   * This property should only be enabled for tests.
-   */
-  static final String TEST_REUSE_SPARK_CONTEXT =
-      "dataflow.spark.test.reuseSparkContext";
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-
-  private SparkContextFactory() {
-  }
-
-  static synchronized JavaSparkContext getSparkContext(String master, String appName) {
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
-      if (sparkContext == null) {
-        sparkContext = createSparkContext(master, appName);
-        sparkMaster = master;
-      } else if (!master.equals(sparkMaster)) {
-        throw new IllegalArgumentException(String.format("Cannot reuse spark context " +
-                "with different spark master URL. Existing: %s, requested: %s.",
-            sparkMaster, master));
-      }
-      return sparkContext;
-    } else {
-      return createSparkContext(master, appName);
-    }
-  }
-
-  static synchronized void stopSparkContext(JavaSparkContext context) {
-    if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
-      context.stop();
-    }
-  }
-
-  private static JavaSparkContext createSparkContext(String master, String appName) {
-    SparkConf conf = new SparkConf();
-    conf.setMaster(master);
-    conf.setAppName(appName);
-    conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
-    return new JavaSparkContext(conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineEvaluator.java
deleted file mode 100644
index becf15a..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineEvaluator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-
-/**
- * Pipeline {@link SparkPipelineRunner.Evaluator} for Spark.
- */
-public final class SparkPipelineEvaluator extends SparkPipelineRunner.Evaluator {
-
-  private final EvaluationContext ctxt;
-
-  public SparkPipelineEvaluator(EvaluationContext ctxt, SparkPipelineTranslator translator) {
-    super(translator);
-    this.ctxt = ctxt;
-  }
-
-  @Override
-  protected <PT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformTreeNode
-      node) {
-    @SuppressWarnings("unchecked")
-    PT transform = (PT) node.getTransform();
-    @SuppressWarnings("unchecked")
-    Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
-    @SuppressWarnings("unchecked") TransformEvaluator<PT> evaluator =
-        (TransformEvaluator<PT>) translator.translate(transformClass);
-    LOG.info("Evaluating {}", transform);
-    AppliedPTransform<PInput, POutput, PT> appliedTransform =
-        AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform);
-    ctxt.setCurrentTransform(appliedTransform);
-    evaluator.evaluate(transform, ctxt);
-    ctxt.setCurrentTransform(null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsFactory.java
deleted file mode 100644
index 9bff013..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-
-public final class SparkPipelineOptionsFactory {
-  private SparkPipelineOptionsFactory() {
-  }
-
-  public static SparkPipelineOptions create() {
-    return PipelineOptionsFactory.as(SparkPipelineOptions.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsRegistrar.java
deleted file mode 100644
index c68af64..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsRegistrar.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
-import com.google.common.collect.ImmutableList;
-
-public class SparkPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
-  @Override
-  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-    return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index b1a402f..d5e4186 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -28,10 +28,10 @@ import com.google.cloud.dataflow.sdk.values.PInput;
 import com.google.cloud.dataflow.sdk.values.POutput;
 import com.google.cloud.dataflow.sdk.values.PValue;
 
-import org.apache.beam.runners.spark.streaming.SparkStreamingPipelineOptions;
-import org.apache.beam.runners.spark.streaming.StreamingEvaluationContext;
-import org.apache.beam.runners.spark.streaming.StreamingTransformTranslator;
-import org.apache.beam.runners.spark.streaming.StreamingWindowPipelineDetector;
+import org.apache.beam.runners.spark.translation.*;
+import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
+import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
+import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
 
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaSparkContext;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunnerRegistrar.java
deleted file mode 100644
index 7861685..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunnerRegistrar.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar;
-import com.google.common.collect.ImmutableList;
-
-public class SparkPipelineRunnerRegistrar implements PipelineRunnerRegistrar {
-  @Override
-  public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-    return ImmutableList.<Class<? extends PipelineRunner<?>>>of(SparkPipelineRunner.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineTranslator.java
deleted file mode 100644
index 2e38a07..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineTranslator.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-/**
- * Translator to support translation between Dataflow transformations and Spark transformations.
- */
-public interface SparkPipelineTranslator {
-
-  boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz);
-
-  <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkProcessContext.java
deleted file mode 100644
index b3a720d..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkProcessContext.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.util.state.*;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterables;
-
-import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
-
-  private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class);
-
-  private final DoFn<I, O> fn;
-  private final SparkRuntimeContext mRuntimeContext;
-  private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
-
-  protected WindowedValue<I> windowedValue;
-
-  SparkProcessContext(DoFn<I, O> fn,
-      SparkRuntimeContext runtime,
-      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
-    fn.super();
-    this.fn = fn;
-    this.mRuntimeContext = runtime;
-    this.mSideInputs = sideInputs;
-  }
-
-  void setup() {
-    setupDelegateAggregators();
-  }
-
-  @Override
-  public PipelineOptions getPipelineOptions() {
-    return mRuntimeContext.getPipelineOptions();
-  }
-
-  @Override
-  public <T> T sideInput(PCollectionView<T> view) {
-    @SuppressWarnings("unchecked")
-    BroadcastHelper<Iterable<WindowedValue<?>>> broadcastHelper =
-        (BroadcastHelper<Iterable<WindowedValue<?>>>) mSideInputs.get(view.getTagInternal());
-    Iterable<WindowedValue<?>> contents = broadcastHelper.getValue();
-    return view.fromIterableInternal(contents);
-  }
-
-  @Override
-  public abstract void output(O output);
-
-  public abstract void output(WindowedValue<O> output);
-
-  @Override
-  public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
-    String message = "sideOutput is an unsupported operation for doFunctions, use a " +
-        "MultiDoFunction instead.";
-    LOG.warn(message);
-    throw new UnsupportedOperationException(message);
-  }
-
-  @Override
-  public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
-    String message =
-        "sideOutputWithTimestamp is an unsupported operation for doFunctions, use a " +
-            "MultiDoFunction instead.";
-    LOG.warn(message);
-    throw new UnsupportedOperationException(message);
-  }
-
-  @Override
-  public <AI, AO> Aggregator<AI, AO> createAggregatorInternal(
-      String named,
-      Combine.CombineFn<AI, ?, AO> combineFn) {
-    return mRuntimeContext.createAggregator(named, combineFn);
-  }
-
-  @Override
-  public I element() {
-    return windowedValue.getValue();
-  }
-
-  @Override
-  public void outputWithTimestamp(O output, Instant timestamp) {
-    output(WindowedValue.of(output, timestamp,
-        windowedValue.getWindows(), windowedValue.getPane()));
-  }
-
-  @Override
-  public Instant timestamp() {
-    return windowedValue.getTimestamp();
-  }
-
-  @Override
-  public BoundedWindow window() {
-    if (!(fn instanceof DoFn.RequiresWindowAccess)) {
-      throw new UnsupportedOperationException(
-          "window() is only available in the context of a DoFn marked as RequiresWindow.");
-    }
-    return Iterables.getOnlyElement(windowedValue.getWindows());
-  }
-
-  @Override
-  public PaneInfo pane() {
-    return windowedValue.getPane();
-  }
-
-  @Override
-  public WindowingInternals<I, O> windowingInternals() {
-    return new WindowingInternals<I, O>() {
-
-      @Override
-      public Collection<? extends BoundedWindow> windows() {
-        return windowedValue.getWindows();
-      }
-
-      @Override
-      public void outputWindowedValue(O output, Instant timestamp, Collection<?
-          extends BoundedWindow> windows, PaneInfo paneInfo) {
-        output(WindowedValue.of(output, timestamp, windows, paneInfo));
-      }
-
-      @Override
-      public StateInternals stateInternals() {
-        //TODO: implement state internals.
-        // This is a temporary placeholder to get the TfIdfTest
-        // working for the initial Beam code drop.
-        return InMemoryStateInternals.forKey("DUMMY");
-      }
-
-      @Override
-      public TimerInternals timerInternals() {
-        throw new UnsupportedOperationException(
-            "WindowingInternals#timerInternals() is not yet supported.");
-      }
-
-      @Override
-      public PaneInfo pane() {
-        return windowedValue.getPane();
-      }
-
-      @Override
-      public <T> void writePCollectionViewData(TupleTag<?> tag,
-          Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-        throw new UnsupportedOperationException(
-            "WindowingInternals#writePCollectionViewData() is not yet supported.");
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-        throw new UnsupportedOperationException(
-            "WindowingInternals#sideInput() is not yet supported.");
-      }
-    };
-  }
-
-  protected abstract void clearOutput();
-  protected abstract Iterator<V> getOutputIterator();
-
-  protected Iterable<V> getOutputIterable(final Iterator<WindowedValue<I>> iter,
-      final DoFn<I, O> doFn) {
-    return new Iterable<V>() {
-      @Override
-      public Iterator<V> iterator() {
-        return new ProcCtxtIterator(iter, doFn);
-      }
-    };
-  }
-
-  private class ProcCtxtIterator extends AbstractIterator<V> {
-
-    private final Iterator<WindowedValue<I>> inputIterator;
-    private final DoFn<I, O> doFn;
-    private Iterator<V> outputIterator;
-    private boolean calledFinish;
-
-    ProcCtxtIterator(Iterator<WindowedValue<I>> iterator, DoFn<I, O> doFn) {
-      this.inputIterator = iterator;
-      this.doFn = doFn;
-      this.outputIterator = getOutputIterator();
-    }
-
-    @Override
-    protected V computeNext() {
-      // Process each element from the (input) iterator, which produces, zero, one or more
-      // output elements (of type V) in the output iterator. Note that the output
-      // collection (and iterator) is reset between each call to processElement, so the
-      // collection only holds the output values for each call to processElement, rather
-      // than for the whole partition (which would use too much memory).
-      while (true) {
-        if (outputIterator.hasNext()) {
-          return outputIterator.next();
-        } else if (inputIterator.hasNext()) {
-          clearOutput();
-          windowedValue = inputIterator.next();
-          try {
-            doFn.processElement(SparkProcessContext.this);
-          } catch (Exception e) {
-            throw new SparkProcessException(e);
-          }
-          outputIterator = getOutputIterator();
-        } else {
-          // no more input to consume, but finishBundle can produce more output
-          if (!calledFinish) {
-            clearOutput();
-            try {
-              calledFinish = true;
-              doFn.finishBundle(SparkProcessContext.this);
-            } catch (Exception e) {
-              throw new SparkProcessException(e);
-            }
-            outputIterator = getOutputIterator();
-            continue; // try to consume outputIterator from start of loop
-          }
-          return endOfData();
-        }
-      }
-    }
-  }
-
-  static class SparkProcessException extends RuntimeException {
-    SparkProcessException(Throwable t) {
-      super(t);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRuntimeContext.java
deleted file mode 100644
index f0f9974..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRuntimeContext.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Max;
-import com.google.cloud.dataflow.sdk.transforms.Min;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.beam.runners.spark.aggregators.AggAccumParam;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-
-import org.apache.spark.Accumulator;
-import org.apache.spark.api.java.JavaSparkContext;
-
-
-/**
- * The SparkRuntimeContext allows us to define useful features on the client side before our
- * data flow program is launched.
- */
-public class SparkRuntimeContext implements Serializable {
-  /**
-   * An accumulator that is a map from names to aggregators.
-   */
-  private final Accumulator<NamedAggregators> accum;
-
-  private final String serializedPipelineOptions;
-
-  /**
-   * Map fo names to dataflow aggregators.
-   */
-  private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>();
-  private transient CoderRegistry coderRegistry;
-
-  SparkRuntimeContext(JavaSparkContext jsc, Pipeline pipeline) {
-    this.accum = jsc.accumulator(new NamedAggregators(), new AggAccumParam());
-    this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions());
-  }
-
-  private static String serializePipelineOptions(PipelineOptions pipelineOptions) {
-    try {
-      return new ObjectMapper().writeValueAsString(pipelineOptions);
-    } catch (JsonProcessingException e) {
-      throw new IllegalStateException("Failed to serialize the pipeline options.", e);
-    }
-  }
-
-  private static PipelineOptions deserializePipelineOptions(String serializedPipelineOptions) {
-    try {
-      return new ObjectMapper().readValue(serializedPipelineOptions, PipelineOptions.class);
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to deserialize the pipeline options.", e);
-    }
-  }
-
-  /**
-   * Retrieves corresponding value of an aggregator.
-   *
-   * @param aggregatorName Name of the aggregator to retrieve the value of.
-   * @param typeClass      Type class of value to be retrieved.
-   * @param <T>            Type of object to be returned.
-   * @return The value of the aggregator.
-   */
-  public <T> T getAggregatorValue(String aggregatorName, Class<T> typeClass) {
-    return accum.value().getValue(aggregatorName, typeClass);
-  }
-
-  public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) {
-    @SuppressWarnings("unchecked")
-    Class<T> aggValueClass = (Class<T>) aggregator.getCombineFn().getOutputType().getRawType();
-    final T aggregatorValue = getAggregatorValue(aggregator.getName(), aggValueClass);
-    return new AggregatorValues<T>() {
-      @Override
-      public Collection<T> getValues() {
-        return ImmutableList.of(aggregatorValue);
-      }
-
-      @Override
-      public Map<String, T> getValuesAtSteps() {
-        throw new UnsupportedOperationException("getValuesAtSteps is not supported.");
-      }
-    };
-  }
-
-  public synchronized PipelineOptions getPipelineOptions() {
-    return deserializePipelineOptions(serializedPipelineOptions);
-  }
-
-  /**
-   * Creates and aggregator and associates it with the specified name.
-   *
-   * @param named     Name of aggregator.
-   * @param combineFn Combine function used in aggregation.
-   * @param <IN>      Type of inputs to aggregator.
-   * @param <INTER>   Intermediate data type
-   * @param <OUT>     Type of aggregator outputs.
-   * @return Specified aggregator
-   */
-  public synchronized <IN, INTER, OUT> Aggregator<IN, OUT> createAggregator(
-      String named,
-      Combine.CombineFn<? super IN, INTER, OUT> combineFn) {
-    @SuppressWarnings("unchecked")
-    Aggregator<IN, OUT> aggregator = (Aggregator<IN, OUT>) aggregators.get(named);
-    if (aggregator == null) {
-      @SuppressWarnings("unchecked")
-      NamedAggregators.CombineFunctionState<IN, INTER, OUT> state =
-          new NamedAggregators.CombineFunctionState<>(
-              (Combine.CombineFn<IN, INTER, OUT>) combineFn,
-              (Coder<IN>) getCoder(combineFn),
-              this);
-      accum.add(new NamedAggregators(named, state));
-      aggregator = new SparkAggregator<>(named, state);
-      aggregators.put(named, aggregator);
-    }
-    return aggregator;
-  }
-
-  public CoderRegistry getCoderRegistry() {
-    if (coderRegistry == null) {
-      coderRegistry = new CoderRegistry();
-      coderRegistry.registerStandardCoders();
-    }
-    return coderRegistry;
-  }
-
-  private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
-    try {
-      if (combiner.getClass() == Sum.SumIntegerFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Sum.SumLongFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Sum.SumDoubleFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else if (combiner.getClass() == Min.MinIntegerFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Min.MinLongFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Min.MinDoubleFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else if (combiner.getClass() == Max.MaxIntegerFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Max.MaxLongFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Max.MaxDoubleFn.class) {
-        return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else {
-        throw new IllegalArgumentException("unsupported combiner in Aggregator: "
-            + combiner.getClass().getName());
-      }
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalStateException("Could not determine default coder for combiner", e);
-    }
-  }
-
-  /**
-   * Initialize spark aggregators exactly once.
-   *
-   * @param <IN> Type of element fed in to aggregator.
-   */
-  private static class SparkAggregator<IN, OUT> implements Aggregator<IN, OUT>, Serializable {
-    private final String name;
-    private final NamedAggregators.State<IN, ?, OUT> state;
-
-    SparkAggregator(String name, NamedAggregators.State<IN, ?, OUT> state) {
-      this.name = name;
-      this.state = state;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public void addValue(IN elem) {
-      state.update(elem);
-    }
-
-    @Override
-    public Combine.CombineFn<IN, ?, OUT> getCombineFn() {
-      return state.getCombineFn();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java
new file mode 100644
index 0000000..be40313
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+
+/**
+ * Options used to configure Spark streaming.
+ */
+public interface SparkStreamingPipelineOptions extends SparkPipelineOptions {
+  @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until " +
+          "execution is stopped")
+  @Default.Long(-1)
+  Long getTimeout();
+
+  void setTimeout(Long batchInterval);
+
+  @Override
+  @Default.Boolean(true)
+  boolean isStreaming();
+
+  @Override
+  @Default.String("spark streaming dataflow pipeline job")
+  String getAppName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformEvaluator.java
deleted file mode 100644
index 4b4f81f..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformEvaluator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import java.io.Serializable;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-public interface TransformEvaluator<PT extends PTransform<?, ?>> extends Serializable {
-  void evaluate(PT transform, EvaluationContext context);
-}


Mime
View raw message