beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [16/23] incubator-beam git commit: [BEAM-11] second iteration of package reorganisation
Date Tue, 15 Mar 2016 18:48:13 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
new file mode 100644
index 0000000..6ba04b7
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * Helper functions for working with windows.
+ */
+public final class WindowingHelpers {
+  private WindowingHelpers() {
+  }
+
+  /**
+   * A function for converting a value to a {@link WindowedValue}. The resulting
+   * {@link WindowedValue} will be in no windows, and will have the default timestamp
+   * and pane.
+   *
+   * @param <T>   The type of the object.
+   * @return A function that accepts an object and returns its {@link WindowedValue}.
+   */
+  public static <T> Function<T, WindowedValue<T>> windowFunction() {
+    return new Function<T, WindowedValue<T>>() {
+      @Override
+      public WindowedValue<T> call(T t) {
+        return WindowedValue.valueInEmptyWindows(t);
+      }
+    };
+  }
+
+  /**
+   * A function for extracting the value from a {@link WindowedValue}.
+   *
+   * @param <T>   The type of the object.
+   * @return A function that accepts a {@link WindowedValue} and returns its value.
+   */
+  public static <T> Function<WindowedValue<T>, T> unwindowFunction() {
+    return new Function<WindowedValue<T>, T>() {
+      @Override
+      public T call(WindowedValue<T> t) {
+        return t.getValue();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java
new file mode 100644
index 0000000..2c34caf
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
+
+public final class SparkStreamingPipelineOptionsFactory {
+
+  private SparkStreamingPipelineOptionsFactory() {
+  }
+
+  public static SparkStreamingPipelineOptions create() {
+    return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..e39d3ed
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
+
+public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.<Class<? extends PipelineOptions>>of(SparkStreamingPipelineOptions
+            .class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
new file mode 100644
index 0000000..0e87355
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.cloud.dataflow.sdk.values.PValue;
+
+import org.apache.beam.runners.spark.translation.EvaluationContext;
+import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+
+/**
+ * Streaming evaluation context helps to handle streaming.
+ */
+public class StreamingEvaluationContext extends EvaluationContext {
+
+  private final JavaStreamingContext jssc;
+  private final long timeout;
+  private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>();
+  private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>();
+
+  public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
+      JavaStreamingContext jssc, long timeout) {
+    super(jsc, pipeline);
+    this.jssc = jssc;
+    this.timeout = timeout;
+  }
+
+  /**
+   * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for
+   * testing.
+   */
+  private class DStreamHolder<T> {
+
+    private Iterable<Iterable<T>> values;
+    private Coder<T> coder;
+    private JavaDStream<WindowedValue<T>> dStream;
+
+    DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) {
+      this.values = values;
+      this.coder = coder;
+    }
+
+    DStreamHolder(JavaDStream<WindowedValue<T>> dStream) {
+      this.dStream = dStream;
+    }
+
+    @SuppressWarnings("unchecked")
+    JavaDStream<WindowedValue<T>> getDStream() {
+      if (dStream == null) {
+        // create the DStream from values
+        Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
+        for (Iterable<T> v : values) {
+          setOutputRDDFromValues(currentTransform.getTransform(), v, coder);
+          rddQueue.offer((JavaRDD<WindowedValue<T>>) getOutputRDD(currentTransform.getTransform()));
+        }
+        // create dstream from queue, one at a time, no defaults
+        // mainly for unit test so no reason to have this configurable
+        dStream = jssc.queueStream(rddQueue, true);
+      }
+      return dStream;
+    }
+  }
+
+  <T> void setDStreamFromQueue(
+      PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
+    pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder));
+  }
+
+  <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) {
+    PValue pvalue = (PValue) getOutput(transform);
+    DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream);
+    pstreams.put(pvalue, dStreamHolder);
+    leafStreams.add(dStreamHolder);
+  }
+
+  boolean hasStream(PTransform<?, ?> transform) {
+    PValue pvalue = (PValue) getInput(transform);
+    return pstreams.containsKey(pvalue);
+  }
+
+  JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
+    return getStream((PValue) getInput(transform));
+  }
+
+  JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
+    DStreamHolder<?> dStreamHolder = pstreams.get(pvalue);
+    JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream();
+    leafStreams.remove(dStreamHolder);
+    return dStream;
+  }
+
+  // used to set the RDD from the DStream in the RDDHolder for transformation
+  <T> void setInputRDD(
+      PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) {
+    setRDD((PValue) getInput(transform), rdd);
+  }
+
+  // used to get the RDD transformation output and use it as the DStream transformation output
+  JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) {
+    return getRDD((PValue) getOutput(transform));
+  }
+
+  public JavaStreamingContext getStreamingContext() {
+    return jssc;
+  }
+
+  @Override
+  public void computeOutputs() {
+    for (DStreamHolder<?> streamHolder : leafStreams) {
+      computeOutput(streamHolder);
+    }
+  }
+
+  private static <T> void computeOutput(DStreamHolder<T> streamHolder) {
+    streamHolder.getDStream().foreachRDD(new Function<JavaRDD<WindowedValue<T>>, Void>() {
+      @Override
+      public Void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
+        rdd.rdd().cache();
+        rdd.count();
+        return null;
+      }
+    }); // force a DStream action
+  }
+
+  @Override
+  public void close() {
+    if (timeout > 0) {
+      jssc.awaitTerminationOrTimeout(timeout);
+    } else {
+      jssc.awaitTermination();
+    }
+    //TODO: stop gracefully ?
+    jssc.stop(false, false);
+    state = State.DONE;
+    super.close();
+  }
+
+  private State state = State.RUNNING;
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  //---------------- override in order to expose in package
+  @Override
+  protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
+    return super.getInput(transform);
+  }
+  @Override
+  protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
+    return super.getOutput(transform);
+  }
+
+  @Override
+  protected JavaSparkContext getSparkContext() {
+    return super.getSparkContext();
+  }
+
+  @Override
+  protected SparkRuntimeContext getRuntimeContext() {
+    return super.getRuntimeContext();
+  }
+
+  @Override
+  protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+    super.setCurrentTransform(transform);
+  }
+
+  @Override
+  protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    return super.getCurrentTransform();
+  }
+
+  @Override
+  protected <T> void setOutputRDD(PTransform<?, ?> transform,
+      JavaRDDLike<WindowedValue<T>, ?> rdd) {
+    super.setOutputRDD(transform, rdd);
+  }
+
+  @Override
+  protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
+      Coder<T> coder) {
+    super.setOutputRDDFromValues(transform, values, coder);
+  }
+
+  @Override
+  protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
+    return super.hasOutputRDD(transform);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
new file mode 100644
index 0000000..349bb7c
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.api.client.util.Lists;
+import com.google.api.client.util.Maps;
+import com.google.api.client.util.Sets;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.io.AvroIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+import com.google.common.reflect.TypeToken;
+import kafka.serializer.Decoder;
+
+import org.apache.beam.runners.spark.io.ConsoleIO;
+import org.apache.beam.runners.spark.io.CreateStream;
+import org.apache.beam.runners.spark.io.KafkaIO;
+import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
+import org.apache.beam.runners.spark.translation.DoFnFunction;
+import org.apache.beam.runners.spark.translation.EvaluationContext;
+import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.translation.TransformEvaluator;
+import org.apache.beam.runners.spark.translation.TransformTranslator;
+import org.apache.beam.runners.spark.translation.WindowingHelpers;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
+import org.apache.spark.streaming.api.java.JavaPairInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+
+import scala.Tuple2;
+
+
+/**
+ * Supports translation between a DataFlow transform, and Spark's operations on DStreams.
+ */
+public final class StreamingTransformTranslator {
+
+  private StreamingTransformTranslator() {
+  }
+
+  private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
+    return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() {
+      @Override
+      public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
+        @SuppressWarnings("unchecked")
+        JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
+            (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
+            ((StreamingEvaluationContext) context).getStream(transform);
+        dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum());
+      }
+    };
+  }
+
+  private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() {
+    return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() {
+      @Override
+      public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        JavaStreamingContext jssc = sec.getStreamingContext();
+        Class<K> keyClazz = transform.getKeyClass();
+        Class<V> valueClazz = transform.getValueClass();
+        Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass();
+        Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass();
+        Map<String, String> kafkaParams = transform.getKafkaParams();
+        Set<String> topics = transform.getTopics();
+        JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz,
+                valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics);
+        JavaDStream<WindowedValue<KV<K, V>>> inputStream =
+            inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() {
+          @Override
+          public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
+            return KV.of(t2._1(), t2._2());
+          }
+        }).map(WindowingHelpers.<KV<K, V>>windowFunction());
+        sec.setStream(transform, inputStream);
+      }
+    };
+  }
+
+  private static <T> TransformEvaluator<Create.Values<T>> create() {
+    return new TransformEvaluator<Create.Values<T>>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void evaluate(Create.Values<T> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        Iterable<T> elems = transform.getElements();
+        Coder<T> coder = sec.getOutput(transform).getCoder();
+        if (coder != VoidCoder.of()) {
+          // actual create
+          sec.setOutputRDDFromValues(transform, elems, coder);
+        } else {
+          // fake create as an input
+          // creates a stream with a single batch containing a single null element
+          // to invoke following transformations once
+          // to support DataflowAssert
+          sec.setDStreamFromQueue(transform,
+              Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)),
+              (Coder<Void>) coder);
+        }
+      }
+    };
+  }
+
+  private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
+    return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
+      @Override
+      public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        Iterable<Iterable<T>> values = transform.getQueuedValues();
+        Coder<T> coder = sec.getOutput(transform).getCoder();
+        sec.setDStreamFromQueue(transform, values, coder);
+      }
+    };
+  }
+
+  private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
+    return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        PCollectionList<T> pcs = sec.getInput(transform);
+        JavaDStream<WindowedValue<T>> first =
+            (JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0));
+        List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1);
+        for (int i = 1; i < pcs.size(); i++) {
+          rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i)));
+        }
+        JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest);
+        sec.setStream(transform, dstream);
+      }
+    };
+  }
+
+  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform(
+      final SparkPipelineTranslator rddTranslator) {
+    return new TransformEvaluator<PT>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void evaluate(PT transform, EvaluationContext context) {
+        TransformEvaluator<PT> rddEvaluator =
+            rddTranslator.translate((Class<PT>) transform.getClass());
+
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        if (sec.hasStream(transform)) {
+          JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
+              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
+              sec.getStream(transform);
+
+          sec.setStream(transform, dStream
+              .transform(new RDDTransform<>(sec, rddEvaluator, transform)));
+        } else {
+          // if the transformation requires direct access to RDD (not in stream)
+          // this is used for "fake" transformations like with DataflowAssert
+          rddEvaluator.evaluate(transform, context);
+        }
+      }
+    };
+  }
+
+  /**
+   * RDD transform function If the transformation function doesn't have an input, create a fake one
+   * as an empty RDD.
+   *
+   * @param <PT> PTransform type
+   */
+  private static final class RDDTransform<PT extends PTransform<?, ?>>
+      implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> {
+
+    private final StreamingEvaluationContext context;
+    private final AppliedPTransform<?, ?, ?> appliedPTransform;
+    private final TransformEvaluator<PT> rddEvaluator;
+    private final PT transform;
+
+
+    private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator,
+        PT transform) {
+      this.context = context;
+      this.appliedPTransform = context.getCurrentTransform();
+      this.rddEvaluator = rddEvaluator;
+      this.transform = transform;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public JavaRDD<WindowedValue<Object>>
+        call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
+      AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
+      context.setCurrentTransform(appliedPTransform);
+      context.setInputRDD(transform, rdd);
+      rddEvaluator.evaluate(transform, context);
+      if (!context.hasOutputRDD(transform)) {
+        // fake RDD as output
+        context.setOutputRDD(transform,
+            context.getSparkContext().<WindowedValue<Object>>emptyRDD());
+      }
+      JavaRDD<WindowedValue<Object>> outRDD =
+          (JavaRDD<WindowedValue<Object>>) context.getOutputRDD(transform);
+      context.setCurrentTransform(existingAPT);
+      return outRDD;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD(
+      final SparkPipelineTranslator rddTranslator) {
+    return new TransformEvaluator<PT>() {
+      @Override
+      public void evaluate(PT transform, EvaluationContext context) {
+        TransformEvaluator<PT> rddEvaluator =
+            rddTranslator.translate((Class<PT>) transform.getClass());
+
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        if (sec.hasStream(transform)) {
+          JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
+              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
+              sec.getStream(transform);
+
+          dStream.foreachRDD(new RDDOutputOperator<>(sec, rddEvaluator, transform));
+        } else {
+          rddEvaluator.evaluate(transform, context);
+        }
+      }
+    };
+  }
+
+  /**
+   * RDD output function.
+   *
+   * @param <PT> PTransform type
+   */
+  private static final class RDDOutputOperator<PT extends PTransform<?, ?>>
+      implements Function<JavaRDD<WindowedValue<Object>>, Void> {
+
+    private final StreamingEvaluationContext context;
+    private final AppliedPTransform<?, ?, ?> appliedPTransform;
+    private final TransformEvaluator<PT> rddEvaluator;
+    private final PT transform;
+
+
+    private RDDOutputOperator(StreamingEvaluationContext context,
+        TransformEvaluator<PT> rddEvaluator, PT transform) {
+      this.context = context;
+      this.appliedPTransform = context.getCurrentTransform();
+      this.rddEvaluator = rddEvaluator;
+      this.transform = transform;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Void call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
+      AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
+      context.setCurrentTransform(appliedPTransform);
+      context.setInputRDD(transform, rdd);
+      rddEvaluator.evaluate(transform, context);
+      context.setCurrentTransform(existingAPT);
+      return null;
+    }
+  }
+
+  private static final TransformTranslator.FieldGetter WINDOW_FG =
+      new TransformTranslator.FieldGetter(Window.Bound.class);
+
+  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
+    return new TransformEvaluator<Window.Bound<T>>() {
+      @Override
+      public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        //--- first we apply windowing to the stream
+        WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
+        @SuppressWarnings("unchecked")
+        JavaDStream<WindowedValue<T>> dStream =
+            (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
+        if (windowFn instanceof FixedWindows) {
+          Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize()
+              .getMillis());
+          sec.setStream(transform, dStream.window(windowDuration));
+        } else if (windowFn instanceof SlidingWindows) {
+          Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize()
+              .getMillis());
+          Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod()
+              .getMillis());
+          sec.setStream(transform, dStream.window(windowDuration, slideDuration));
+        }
+        //--- then we apply windowing to the elements
+        DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
+        DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
+            ((StreamingEvaluationContext)context).getRuntimeContext(), null);
+        @SuppressWarnings("unchecked")
+        JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
+            (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
+            sec.getStream(transform);
+        sec.setStream(transform, dstream.mapPartitions(dofn));
+      }
+    };
+  }
+
+  private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
+      .newHashMap();
+
+  static {
+    EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
+    EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue());
+    EVALUATORS.put(Create.Values.class, create());
+    EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka());
+    EVALUATORS.put(Window.Bound.class, window());
+    EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
+  }
+
+  private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS = Sets
+      .newHashSet();
+
+  static {
+    //TODO - add support for the following
+    UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
+      getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) {
+    TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
+    if (transform == null) {
+      if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
+        throw new UnsupportedOperationException("Dataflow transformation " + clazz
+          .getCanonicalName()
+          + " is currently unsupported by the Spark streaming pipeline");
+      }
+      // DStream transformations will transform an RDD into another RDD
+      // Actions will create output
+      // In Dataflow it depends on the PTransform's Input and Output class
+      Class<?> pTOutputClazz = getPTransformOutputClazz(clazz);
+      if (PDone.class.equals(pTOutputClazz)) {
+        return foreachRDD(rddTranslator);
+      } else {
+        return rddTransform(rddTranslator);
+      }
+    }
+    return transform;
+  }
+
+  private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) {
+    Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments();
+    return TypeToken.of(clazz).resolveType(types[1]).getRawType();
+  }
+
+  /**
+   * Translator matches Dataflow transformation with the appropriate Spark streaming evaluator.
+   * rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation
+   */
+  public static class Translator implements SparkPipelineTranslator {
+
+    private final SparkPipelineTranslator rddTranslator;
+
+    public Translator(SparkPipelineTranslator rddTranslator) {
+      this.rddTranslator = rddTranslator;
+    }
+
+    @Override
+    public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
+      // streaming includes rdd transformations as well
+      return EVALUATORS.containsKey(clazz) || rddTranslator.hasTranslation(clazz);
+    }
+
+    @Override
+    public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
+      return getTransformEvaluator(clazz, rddTranslator);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
new file mode 100644
index 0000000..504ea92
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation.streaming;
+
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+
+import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.translation.TransformTranslator;
+
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Durations;
+
+
+/**
+ * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing.
+ */
+public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator {
+
+  // Currently, Spark streaming recommends batches no smaller then 500 msec
+  private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500);
+
+  private boolean windowing;
+  private Duration batchDuration;
+
+  public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) {
+    super(translator);
+  }
+
+  private static final TransformTranslator.FieldGetter WINDOW_FG =
+      new TransformTranslator.FieldGetter(Window.Bound.class);
+
+  // Use the smallest window (fixed or sliding) as Spark streaming's batch duration
+  @Override
+  protected <PT extends PTransform<? super PInput, POutput>> void
+      doVisitTransform(TransformTreeNode node) {
+    @SuppressWarnings("unchecked")
+    PT transform = (PT) node.getTransform();
+    @SuppressWarnings("unchecked")
+    Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
+    if (transformClass.isAssignableFrom(Window.Bound.class)) {
+      WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
+      if (windowFn instanceof FixedWindows) {
+        setBatchDuration(((FixedWindows) windowFn).getSize());
+      } else if (windowFn instanceof SlidingWindows) {
+        if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) {
+          throw new UnsupportedOperationException("Spark does not support window offsets");
+        }
+        // Sliding window size might as well set the batch duration. Applying the transformation
+        // will add the "slide"
+        setBatchDuration(((SlidingWindows) windowFn).getSize());
+      } else if (!(windowFn instanceof GlobalWindows)) {
+        throw new IllegalStateException("Windowing function not supported: " + windowFn);
+      }
+    }
+  }
+
+  private void setBatchDuration(org.joda.time.Duration duration) {
+    Long durationMillis = duration.getMillis();
+    // validate window size
+    if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) {
+      throw new IllegalArgumentException("Windowing of size " + durationMillis +
+          "msec is not supported!");
+    }
+    // choose the smallest duration to be Spark's batch duration, larger ones will be handled
+    // as window functions  over the batched-stream
+    if (!windowing || this.batchDuration.milliseconds() > durationMillis) {
+      this.batchDuration = Durations.milliseconds(durationMillis);
+    }
+    windowing = true;
+  }
+
+  public boolean isWindowing() {
+    return windowing;
+  }
+
+  public Duration getBatchDuration() {
+    return batchDuration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
index 98387a6..e4a3a73 100644
--- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
+++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-org.apache.beam.runners.spark.SparkPipelineOptionsRegistrar
-org.apache.beam.runners.spark.streaming.SparkStreamingPipelineOptionsRegistrar
\ No newline at end of file
+org.apache.beam.runners.spark.translation.SparkPipelineOptionsRegistrar
+org.apache.beam.runners.spark.translation.streaming.SparkStreamingPipelineOptionsRegistrar
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
index 972b1a3..7949db4 100644
--- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
+++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-org.apache.beam.runners.spark.SparkPipelineRunnerRegistrar
\ No newline at end of file
+org.apache.beam.runners.spark.translation.SparkPipelineRunnerRegistrar
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CombineGloballyTest.java
deleted file mode 100644
index 35a634a..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CombineGloballyTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.Iterables;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class CombineGloballyTest {
-
-  private static final String[] WORDS_ARRAY = {
-      "hi there", "hi", "hi sue bob",
-      "hi sue", "", "bob hi"};
-  private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-
-  @Test
-  public void test() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
-    PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
-
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
-    assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output)));
-    res.close();
-  }
-
-  public static class WordMerger extends Combine.CombineFn<String, StringBuilder, String> {
-
-    @Override
-    public StringBuilder createAccumulator() {
-      // return null to differentiate from an empty string
-      return null;
-    }
-
-    @Override
-    public StringBuilder addInput(StringBuilder accumulator, String input) {
-      return combine(accumulator, input);
-    }
-
-    @Override
-    public StringBuilder mergeAccumulators(Iterable<StringBuilder> accumulators) {
-      StringBuilder sb = new StringBuilder();
-      for (StringBuilder accum : accumulators) {
-        if (accum != null) {
-          sb.append(accum);
-        }
-      }
-      return sb;
-    }
-
-    @Override
-    public String extractOutput(StringBuilder accumulator) {
-      return accumulator != null ? accumulator.toString(): "";
-    }
-
-    private static StringBuilder combine(StringBuilder accum, String datum) {
-      if (accum == null) {
-        return new StringBuilder(datum);
-      } else {
-        accum.append(",").append(datum);
-        return accum;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CombinePerKeyTest.java
deleted file mode 100644
index a4c5eb7..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CombinePerKeyTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.*;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.ImmutableList;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class CombinePerKeyTest {
-
-    private static final List<String> WORDS =
-        ImmutableList.of("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog");
-    @Test
-    public void testRun() {
-        Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-        PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
-        PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
-        EvaluationResult res = SparkPipelineRunner.create().run(p);
-        Map<String, Long> actualCnts = new HashMap<>();
-        for (KV<String, Long> kv : res.get(cnts)) {
-            actualCnts.put(kv.getKey(), kv.getValue());
-        }
-        res.close();
-        Assert.assertEquals(8, actualCnts.size());
-        Assert.assertEquals(Long.valueOf(2L), actualCnts.get("the"));
-    }
-
-    private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
-      @Override
-      public PCollection<KV<T, Long>> apply(PCollection<T> pcol) {
-          PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() {
-              @Override
-              public void processElement(ProcessContext processContext) throws Exception {
-                  processContext.output(KV.of(processContext.element(), 1L));
-              }
-          })).setCoder(KvCoder.of(pcol.getCoder(), VarLongCoder.of()));
-          return withLongs.apply(Sum.<T>longsPerKey());
-      }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
index 905e243..4a080e8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
@@ -25,6 +25,7 @@ import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.collect.ImmutableSet;
+import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory;
 import org.junit.Test;
 
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DoFnOutputTest.java
deleted file mode 100644
index 1ec3d75..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DoFnOutputTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import org.junit.Test;
-
-import java.io.Serializable;
-
-public class DoFnOutputTest implements Serializable {
-  @Test
-  public void test() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
-    options.setRunner(SparkPipelineRunner.class);
-    Pipeline pipeline = Pipeline.create(options);
-
-    PCollection<String> strings = pipeline.apply(Create.of("a"));
-    // Test that values written from startBundle() and finishBundle() are written to
-    // the output
-    PCollection<String> output = strings.apply(ParDo.of(new DoFn<String, String>() {
-      @Override
-      public void startBundle(Context c) throws Exception {
-        c.output("start");
-      }
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-        c.output(c.element());
-      }
-      @Override
-      public void finishBundle(Context c) throws Exception {
-        c.output("finish");
-      }
-    }));
-
-    DataflowAssert.that(output).containsInAnyOrder("start", "a", "finish");
-
-    EvaluationResult res = SparkPipelineRunner.create().run(pipeline);
-    res.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index e0fe47d..057cf3b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -25,6 +25,7 @@ import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.collect.Iterables;
+import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory;
 import org.junit.Test;
 
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/MultiOutputWordCountTest.java
deleted file mode 100644
index c89090d..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/MultiOutputWordCountTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
-import com.google.cloud.dataflow.sdk.transforms.*;
-import com.google.cloud.dataflow.sdk.values.*;
-import com.google.common.collect.Iterables;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class MultiOutputWordCountTest {
-
-  private static final TupleTag<String> upper = new TupleTag<>();
-  private static final TupleTag<String> lower = new TupleTag<>();
-  private static final TupleTag<KV<String, Long>> lowerCnts = new TupleTag<>();
-  private static final TupleTag<KV<String, Long>> upperCnts = new TupleTag<>();
-
-  @Test
-  public void testRun() throws Exception {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-    PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+"));
-    PCollection<String> w1 = p.apply(Create.of("Here are some words to count", "and some others"));
-    PCollection<String> w2 = p.apply(Create.of("Here are some more words", "and even more words"));
-    PCollectionList<String> list = PCollectionList.of(w1).and(w2);
-
-    PCollection<String> union = list.apply(Flatten.<String>pCollections());
-    PCollectionView<String> regexView = regex.apply(View.<String>asSingleton());
-    CountWords countWords = new CountWords(regexView);
-    PCollectionTuple luc = union.apply(countWords);
-    PCollection<Long> unique = luc.get(lowerCnts).apply(
-        ApproximateUnique.<KV<String, Long>>globally(16));
-
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
-    Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts));
-    Assert.assertEquals("are", actualLower.iterator().next().getKey());
-    Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
-    Assert.assertEquals("Here", actualUpper.iterator().next().getKey());
-    Iterable<Long> actualUniqCount = res.get(unique);
-    Assert.assertEquals(9, (long) actualUniqCount.iterator().next());
-    int actualTotalWords = res.getAggregatorValue("totalWords", Integer.class);
-    Assert.assertEquals(18, actualTotalWords);
-    int actualMaxWordLength = res.getAggregatorValue("maxWordLength", Integer.class);
-    Assert.assertEquals(6, actualMaxWordLength);
-    AggregatorValues<Integer> aggregatorValues = res.getAggregatorValues(countWords
-        .getTotalWordsAggregator());
-    Assert.assertEquals(18, Iterables.getOnlyElement(aggregatorValues.getValues()).intValue());
-
-    res.close();
-  }
-
-  /**
-   * A DoFn that tokenizes lines of text into individual words.
-   */
-  static class ExtractWordsFn extends DoFn<String, String> {
-
-    private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords",
-        new Sum.SumIntegerFn());
-    private final Aggregator<Integer, Integer> maxWordLength = createAggregator("maxWordLength",
-        new Max.MaxIntegerFn());
-    private final PCollectionView<String> regex;
-
-    ExtractWordsFn(PCollectionView<String> regex) {
-      this.regex = regex;
-    }
-
-    @Override
-    public void processElement(ProcessContext c) {
-      String[] words = c.element().split(c.sideInput(regex));
-      for (String word : words) {
-        totalWords.addValue(1);
-        if (!word.isEmpty()) {
-          maxWordLength.addValue(word.length());
-          if (Character.isLowerCase(word.charAt(0))) {
-            c.output(word);
-          } else {
-            c.sideOutput(upper, word);
-          }
-        }
-      }
-    }
-  }
-
-  public static class CountWords extends PTransform<PCollection<String>, PCollectionTuple> {
-
-    private final PCollectionView<String> regex;
-    private final ExtractWordsFn extractWordsFn;
-
-    public CountWords(PCollectionView<String> regex) {
-      this.regex = regex;
-      this.extractWordsFn = new ExtractWordsFn(regex);
-    }
-
-    @Override
-    public PCollectionTuple apply(PCollection<String> lines) {
-      // Convert lines of text into individual words.
-      PCollectionTuple lowerUpper = lines
-          .apply(ParDo.of(extractWordsFn)
-              .withSideInputs(regex)
-              .withOutputTags(lower, TupleTagList.of(upper)));
-      lowerUpper.get(lower).setCoder(StringUtf8Coder.of());
-      lowerUpper.get(upper).setCoder(StringUtf8Coder.of());
-      PCollection<KV<String, Long>> lowerCounts = lowerUpper.get(lower).apply(Count
-          .<String>perElement());
-      PCollection<KV<String, Long>> upperCounts = lowerUpper.get(upper).apply(Count
-          .<String>perElement());
-      return PCollectionTuple
-          .of(lowerCnts, lowerCounts)
-          .and(upperCnts, upperCounts);
-    }
-
-    Aggregator<Integer, Integer> getTotalWordsAggregator() {
-      return extractWordsFn.totalWords;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SerializationTest.java
deleted file mode 100644
index ae1eed7..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SerializationTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.transforms.*;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-public class SerializationTest {
-
-  public static class StringHolder { // not serializable
-    private final String string;
-
-    public StringHolder(String string) {
-      this.string = string;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      StringHolder that = (StringHolder) o;
-      return string.equals(that.string);
-    }
-
-    @Override
-    public int hashCode() {
-      return string.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return string;
-    }
-  }
-
-  public static class StringHolderUtf8Coder extends AtomicCoder<StringHolder> {
-
-    private final StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of();
-
-    @Override
-    public void encode(StringHolder value, OutputStream outStream, Context context) throws IOException {
-      stringUtf8Coder.encode(value.toString(), outStream, context);
-    }
-
-    @Override
-    public StringHolder decode(InputStream inStream, Context context) throws IOException {
-      return new StringHolder(stringUtf8Coder.decode(inStream, context));
-    }
-
-    public static Coder<StringHolder> of() {
-      return new StringHolderUtf8Coder();
-    }
-  }
-
-  private static final String[] WORDS_ARRAY = {
-      "hi there", "hi", "hi sue bob",
-      "hi sue", "", "bob hi"};
-  private static final List<StringHolder> WORDS = Lists.transform(
-      Arrays.asList(WORDS_ARRAY), new Function<String, StringHolder>() {
-        @Override public StringHolder apply(String s) {
-          return new StringHolder(s);
-        }
-      });
-  private static final Set<StringHolder> EXPECTED_COUNT_SET =
-      ImmutableSet.copyOf(Lists.transform(
-          Arrays.asList("hi: 5", "there: 1", "sue: 2", "bob: 2"),
-          new Function<String, StringHolder>() {
-            @Override
-            public StringHolder apply(String s) {
-              return new StringHolder(s);
-            }
-          }));
-
-  @Test
-  public void testRun() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
-    options.setRunner(SparkPipelineRunner.class);
-    Pipeline p = Pipeline.create(options);
-    PCollection<StringHolder> inputWords =
-        p.apply(Create.of(WORDS).withCoder(StringHolderUtf8Coder.of()));
-    PCollection<StringHolder> output = inputWords.apply(new CountWords());
-
-    DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
-    res.close();
-  }
-
-  /**
-   * A DoFn that tokenizes lines of text into individual words.
-   */
-  static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> {
-    private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
-
-    @Override
-    public void processElement(ProcessContext c) {
-      // Split the line into words.
-      String[] words = WORD_BOUNDARY.split(c.element().toString());
-
-      // Keep track of the number of lines without any words encountered while tokenizing.
-      // This aggregator is visible in the monitoring UI when run using DataflowPipelineRunner.
-      if (words.length == 0) {
-        emptyLines.addValue(1L);
-      }
-
-      // Output each word encountered into the output PCollection.
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(new StringHolder(word));
-        }
-      }
-    }
-  }
-
-  /**
-   * A DoFn that converts a Word and Count into a printable string.
-   */
-  private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> {
-    @Override
-    public void processElement(ProcessContext c) {
-      c.output(new StringHolder(c.element().getKey() + ": " + c.element().getValue()));
-    }
-  }
-
-  private static class CountWords extends PTransform<PCollection<StringHolder>, PCollection<StringHolder>> {
-    @Override
-    public PCollection<StringHolder> apply(PCollection<StringHolder> lines) {
-
-      // Convert lines of text into individual words.
-      PCollection<StringHolder> words = lines.apply(
-          ParDo.of(new ExtractWordsFn()));
-
-      // Count the number of times each word occurs.
-      PCollection<KV<StringHolder, Long>> wordCounts =
-          words.apply(Count.<StringHolder>perElement());
-
-      // Format each word and count into a printable string.
-
-      return wordCounts.apply(ParDo.of(new FormatCountsFn()));
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java
deleted file mode 100644
index bdc048c..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.net.URI;
-
-import static org.junit.Assert.*;
-
-public class SideEffectsTest implements Serializable {
-
-  static class UserException extends RuntimeException {
-  }
-
-  @Test
-  public void test() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
-    options.setRunner(SparkPipelineRunner.class);
-    Pipeline pipeline = Pipeline.create(options);
-
-    pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
-
-    pipeline.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-        throw new UserException();
-      }
-    }));
-
-    try {
-      pipeline.run();
-      fail("Run should thrown an exception");
-    } catch (RuntimeException e) {
-      assertNotNull(e.getCause());
-
-      // TODO: remove the version check (and the setup and teardown methods) when we no
-      // longer support Spark 1.3 or 1.4
-      String version = SparkContextFactory.getSparkContext(options.getSparkMaster(), options.getAppName()).version();
-      if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
-        assertTrue(e.getCause() instanceof UserException);
-      }
-    }
-  }
-
-  @Before
-  public void setup() {
-    System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "true");
-  }
-
-  @After
-  public void teardown() {
-    System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "false");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index c7dc400..e32b39a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -25,6 +25,7 @@ import com.google.cloud.dataflow.sdk.transforms.*;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.collect.ImmutableSet;
+import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory;
 import org.junit.Test;
 
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java
deleted file mode 100644
index 23416d7..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestSparkPipelineOptionsFactory {
-  @Test
-  public void testDefaultCreateMethod() {
-    SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create();
-    Assert.assertEquals("local[1]", actualOptions.getSparkMaster());
-  }
-
-  @Test
-  public void testSettingCustomOptions() {
-    SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create();
-    actualOptions.setSparkMaster("spark://207.184.161.138:7077");
-    Assert.assertEquals("spark://207.184.161.138:7077", actualOptions.getSparkMaster());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java
deleted file mode 100644
index ed58c77..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.api.client.repackaged.com.google.common.base.Joiner;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Charsets;
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * A test for the transforms registered in TransformTranslator.
- * Builds a regular Dataflow pipeline with each of the mapped
- * transforms, and makes sure that they work when the pipeline is
- * executed in Spark.
- */
-public class TransformTranslatorTest {
-
-  @Rule
-  public TestName name = new TestName();
-
-  private DirectPipelineRunner directRunner;
-  private SparkPipelineRunner sparkRunner;
-  private String testDataDirName;
-
-  @Before public void init() throws IOException {
-    sparkRunner = SparkPipelineRunner.create();
-    directRunner = DirectPipelineRunner.createForTest();
-    testDataDirName = Joiner.on(File.separator).join("target", "test-data", name.getMethodName())
-        + File.separator;
-    FileUtils.deleteDirectory(new File(testDataDirName));
-    new File(testDataDirName).mkdirs();
-  }
-
-  /**
-   * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
-   * in DirectPipelineRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark
-   * transforms. Finally it makes sure that the results are the same for both runs.
-   */
-  @Test
-  public void testTextIOReadAndWriteTransforms() throws IOException {
-    String directOut = runPipeline("direct", directRunner);
-    String sparkOut = runPipeline("spark", sparkRunner);
-
-    List<String> directOutput =
-        Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), Charsets.UTF_8);
-
-    List<String> sparkOutput =
-        Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), Charsets.UTF_8);
-
-    // sort output to get a stable result (PCollections are not ordered)
-    Collections.sort(directOutput);
-    Collections.sort(sparkOutput);
-
-    Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray());
-  }
-
-  private String runPipeline(String name, PipelineRunner<?> runner) {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-    String outFile = Joiner.on(File.separator).join(testDataDirName, "test_text_out_" + name);
-    PCollection<String> lines =  p.apply(TextIO.Read.from("src/test/resources/test_text.txt"));
-    lines.apply(TextIO.Write.to(outFile));
-    runner.run(p);
-    return outFile;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java
deleted file mode 100644
index 77409a0..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.ImmutableList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.joda.time.Duration;
-import org.junit.Test;
-
-public class WindowedWordCountTest {
-  private static final String[] WORDS_ARRAY = {
-          "hi there", "hi", "hi sue bob",
-          "hi sue", "", "bob hi"};
-  private static final Long[] TIMESTAMPS_ARRAY = {
-          60000L, 60000L, 60000L,
-          120000L, 120000L, 120000L};
-  private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-  private static final List<Long> TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY);
-  private static final List<String> EXPECTED_COUNT_SET =
-          ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1",
-                  "hi: 2", "sue: 1", "bob: 1");
-
-  @Test
-  public void testRun() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
-    options.setRunner(SparkPipelineRunner.class);
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-    PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS))
-            .setCoder(StringUtf8Coder.of());
-    PCollection<String> windowedWords = inputWords
-            .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
-
-    PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
-
-    DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
-    res.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 5609e88..9edf41c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -30,7 +30,7 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptionsFactory;
+import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.junit.Before;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java
deleted file mode 100644
index a7b9f28..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.streaming;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-
-import org.apache.beam.runners.spark.io.CreateStream;
-import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
-import org.apache.beam.runners.spark.streaming.utils.DataflowAssertStreaming;
-
-import org.joda.time.Duration;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Test Flatten (union) implementation for streaming.
- */
-public class FlattenStreamingTest {
-
-  private static final String[] WORDS_ARRAY_1 = {
-      "one", "two", "three", "four"};
-  private static final List<Iterable<String>> WORDS_QUEUE_1 =
-      Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_1));
-  private static final String[] WORDS_ARRAY_2 = {
-          "five", "six", "seven", "eight"};
-  private static final List<Iterable<String>> WORDS_QUEUE_2 =
-          Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_2));
-  private static final String[] EXPECTED_UNION = {
-          "one", "two", "three", "four", "five", "six", "seven", "eight"};
-  private static final long TEST_TIMEOUT_MSEC = 1000L;
-
-  @Test
-  public void testRun() throws Exception {
-    SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
-    options.setAppName(this.getClass().getSimpleName());
-    options.setRunner(SparkPipelineRunner.class);
-    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
-    Pipeline p = Pipeline.create(options);
-
-    PCollection<String> w1 =
-            p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of());
-    PCollection<String> windowedW1 =
-            w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
-    PCollection<String> w2 =
-            p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of());
-    PCollection<String> windowedW2 =
-            w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
-    PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
-    PCollection<String> union = list.apply(Flatten.<String>pCollections());
-
-    DataflowAssert.thatIterable(union.apply(View.<String>asIterable()))
-            .containsInAnyOrder(EXPECTED_UNION);
-
-    EvaluationResult res = SparkPipelineRunner.create(options).run(p);
-    res.close();
-
-    DataflowAssertStreaming.assertNoFailures(res);
-  }
-
-}


Mime
View raw message