beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [44/50] [abbrv] incubator-beam git commit: Add spark-streaming support to spark-dataflow
Date Thu, 10 Mar 2016 20:59:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java
new file mode 100644
index 0000000..57253f0
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+
+import com.cloudera.dataflow.spark.SparkPipelineOptions;
+
+/**
+ * Options used to configure Spark streaming.
+ */
+public interface SparkStreamingPipelineOptions extends SparkPipelineOptions {
+  @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until " +
+          "execution is stopped")
+  @Default.Long(-1)
+  Long getTimeout();
+
+  void setTimeout(Long batchInterval);
+
+  @Override
+  @Default.Boolean(true)
+  boolean isStreaming();
+
+  @Override
+  @Default.String("spark streaming dataflow pipeline job")
+  String getAppName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java
new file mode 100644
index 0000000..3b568af
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+
+public final class SparkStreamingPipelineOptionsFactory {
+
+  private SparkStreamingPipelineOptionsFactory() {
+  }
+
+  public static SparkStreamingPipelineOptions create() {
+    return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..01c4375
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
+import com.google.common.collect.ImmutableList;
+
+public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.<Class<? extends PipelineOptions>>of(SparkStreamingPipelineOptions
+            .class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
new file mode 100644
index 0000000..5e1b42d
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark.streaming;
+
+
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.cloud.dataflow.sdk.values.PValue;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+
+import com.cloudera.dataflow.spark.EvaluationContext;
+import com.cloudera.dataflow.spark.SparkRuntimeContext;
+
+/**
+ * Streaming evaluation context helps to handle streaming.
+ */
+public class StreamingEvaluationContext extends EvaluationContext {
+
+  private final JavaStreamingContext jssc;
+  private final long timeout;
+  private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>();
+  private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>();
+
+  public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
+      JavaStreamingContext jssc, long timeout) {
+    super(jsc, pipeline);
+    this.jssc = jssc;
+    this.timeout = timeout;
+  }
+
+  /**
+   * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for
+   * testing.
+   */
+  private class DStreamHolder<T> {
+
+    private Iterable<Iterable<T>> values;
+    private Coder<T> coder;
+    private JavaDStream<WindowedValue<T>> dStream;
+
+    public DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) {
+      this.values = values;
+      this.coder = coder;
+    }
+
+    public DStreamHolder(JavaDStream<WindowedValue<T>> dStream) {
+      this.dStream = dStream;
+    }
+
+    @SuppressWarnings("unchecked")
+    public JavaDStream<WindowedValue<T>> getDStream() {
+      if (dStream == null) {
+        // create the DStream from values
+        Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
+        for (Iterable<T> v : values) {
+          setOutputRDDFromValues(currentTransform.getTransform(), v, coder);
+          rddQueue.offer((JavaRDD<WindowedValue<T>>) getOutputRDD(currentTransform.getTransform()));
+        }
+        // create dstream from queue, one at a time, no defaults
+        // mainly for unit test so no reason to have this configurable
+        dStream = jssc.queueStream(rddQueue, true);
+      }
+      return dStream;
+    }
+  }
+
+  public <T> void setDStreamFromQueue(PTransform<?, ?> transform, Iterable<Iterable<T>> values,
+      Coder<T> coder) {
+    pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder));
+  }
+
+  public <T, R extends JavaRDDLike<WindowedValue<T>, R>>
+      void setStream(PTransform<?, ?> transform, JavaDStreamLike<WindowedValue<T>, ?, R> dStream) {
+    PValue pvalue = (PValue) getOutput(transform);
+    @SuppressWarnings("unchecked")
+    DStreamHolder<T> dStreamHolder = new DStreamHolder((JavaDStream) dStream);
+    pstreams.put(pvalue, dStreamHolder);
+    leafStreams.add(dStreamHolder);
+  }
+
+  boolean hasStream(PTransform<?, ?> transform) {
+    PValue pvalue = (PValue) getInput(transform);
+    return pstreams.containsKey(pvalue);
+  }
+
+  public JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
+    PValue pvalue = (PValue) getInput(transform);
+    DStreamHolder<?> dStreamHolder = pstreams.get(pvalue);
+    JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream();
+    leafStreams.remove(dStreamHolder);
+    return dStream;
+  }
+
+  // used to set the RDD from the DStream in the RDDHolder for transformation
+  public <T> void setInputRDD(PTransform<? extends PInput, ?> transform,
+      JavaRDDLike<WindowedValue<T>, ?> rdd) {
+    setRDD((PValue) getInput(transform), rdd);
+  }
+
+  // used to get the RDD transformation output and use it as the DStream transformation output
+  public JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) {
+    return getRDD((PValue) getOutput(transform));
+  }
+
+  public JavaStreamingContext getStreamingContext() {
+    return jssc;
+  }
+
+  @Override
+  protected void computeOutputs() {
+    for (DStreamHolder<?> streamHolder : leafStreams) {
+      @SuppressWarnings("unchecked")
+      JavaDStream<WindowedValue<?>> stream = (JavaDStream) streamHolder.getDStream();
+      stream.foreachRDD(new Function<JavaRDD<WindowedValue<?>>, Void>() {
+        @Override
+        public Void call(JavaRDD<WindowedValue<?>> rdd) throws Exception {
+          rdd.rdd().cache();
+          rdd.count();
+          return null;
+        }
+      }); // force a DStream action
+    }
+  }
+
+  @Override
+  public void close() {
+    if (timeout > 0) {
+      jssc.awaitTerminationOrTimeout(timeout);
+    } else {
+      jssc.awaitTermination();
+    }
+    //TODO: stop gracefully ?
+    jssc.stop(false, false);
+    state = State.DONE;
+    super.close();
+  }
+
+  private State state = State.RUNNING;
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  //---------------- override in order to expose in package
+  @Override
+  protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
+    return super.getOutput(transform);
+  }
+
+  @Override
+  protected JavaSparkContext getSparkContext() {
+    return super.getSparkContext();
+  }
+
+  @Override
+  protected SparkRuntimeContext getRuntimeContext() {
+    return super.getRuntimeContext();
+  }
+
+  @Override
+  protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+    super.setCurrentTransform(transform);
+  }
+
+  @Override
+  protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    return super.getCurrentTransform();
+  }
+
+  @Override
+  protected <T> void setOutputRDD(PTransform<?, ?> transform,
+      JavaRDDLike<WindowedValue<T>, ?> rdd) {
+    super.setOutputRDD(transform, rdd);
+  }
+
+  @Override
+  protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
+      Coder<T> coder) {
+    super.setOutputRDDFromValues(transform, values, coder);
+  }
+
+  @Override
+  protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
+    return super.hasOutputRDD(transform);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
new file mode 100644
index 0000000..20ee88a
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
@@ -0,0 +1,409 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark.streaming;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.api.client.util.Maps;
+import com.google.api.client.util.Sets;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.io.AvroIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.repackaged.com.google.common.reflect.TypeToken;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+import kafka.serializer.Decoder;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
+import org.apache.spark.streaming.api.java.JavaPairInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+
+import scala.Tuple2;
+
+import com.cloudera.dataflow.hadoop.HadoopIO;
+import com.cloudera.dataflow.io.ConsoleIO;
+import com.cloudera.dataflow.io.CreateStream;
+import com.cloudera.dataflow.io.KafkaIO;
+import com.cloudera.dataflow.spark.DoFnFunction;
+import com.cloudera.dataflow.spark.EvaluationContext;
+import com.cloudera.dataflow.spark.SparkPipelineTranslator;
+import com.cloudera.dataflow.spark.TransformEvaluator;
+import com.cloudera.dataflow.spark.TransformTranslator;
+import com.cloudera.dataflow.spark.WindowingHelpers;
+
+/**
+ * Supports translation between a DataFlow transform, and Spark's operations on DStreams.
+ */
+public final class StreamingTransformTranslator {
+
+  private StreamingTransformTranslator() {
+  }
+
+  private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
+    return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() {
+      @Override
+      public void evaluate(ConsoleIO.Write.Unbound transform, EvaluationContext context) {
+        @SuppressWarnings("unchecked")
+        JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
+            (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
+            ((StreamingEvaluationContext) context).getStream(transform);
+        dstream.map(WindowingHelpers.<T>unwindowFunction())
+            .print(transform.getNum());
+      }
+    };
+  }
+
+  private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() {
+    return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() {
+      @Override
+      public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
+        JavaStreamingContext jssc = ((StreamingEvaluationContext) context).getStreamingContext();
+        Class<K> keyClazz = transform.getKeyClass();
+        Class<V> valueClazz = transform.getValueClass();
+        Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass();
+        Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass();
+        Map<String, String> kafkaParams = transform.getKafkaParams();
+        Set<String> topics = transform.getTopics();
+        JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz,
+                valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics);
+        JavaDStream<WindowedValue<KV<K, V>>> inputStream =
+            inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() {
+          @Override
+          public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
+            return KV.of(t2._1(), t2._2());
+          }
+        }).map(WindowingHelpers.<KV<K, V>>windowFunction());
+        ((StreamingEvaluationContext) context).setStream(transform, inputStream);
+      }
+    };
+  }
+
+  private static <T> TransformEvaluator<com.google.cloud.dataflow.sdk.transforms.Create.Values<T>>
+      create() {
+    return new TransformEvaluator<com.google.cloud.dataflow.sdk.transforms.Create.Values<T>>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void evaluate(com.google.cloud.dataflow.sdk.transforms.Create.Values<T>
+                                   transform, EvaluationContext context) {
+        Iterable<T> elems = transform.getElements();
+        Coder<T> coder = ((StreamingEvaluationContext) context).getOutput(transform)
+                .getCoder();
+        if (coder != VoidCoder.of()) {
+          // actual create
+          ((StreamingEvaluationContext) context).setOutputRDDFromValues(transform,
+                  elems, coder);
+        } else {
+          // fake create as an input
+          // creates a stream with a single batch containing a single null element
+          // to invoke following transformations once
+          // to support DataflowAssert
+          ((StreamingEvaluationContext) context).setDStreamFromQueue(transform,
+              Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)),
+              (Coder<Void>) coder);
+        }
+      }
+    };
+  }
+
+  private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
+    return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
+      @Override
+      public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext
+              context) {
+        Iterable<Iterable<T>> values = transform.getQueuedValues();
+        Coder<T> coder = ((StreamingEvaluationContext) context).getOutput(transform)
+            .getCoder();
+        ((StreamingEvaluationContext) context).setDStreamFromQueue(transform, values,
+            coder);
+      }
+    };
+  }
+
+  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform(
+      final SparkPipelineTranslator rddTranslator) {
+    return new TransformEvaluator<PT>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void evaluate(final PT transform,
+                           final EvaluationContext context) {
+        final TransformEvaluator rddEvaluator =
+            rddTranslator.translate((Class<? extends PTransform<?, ?>>) transform.getClass());
+
+        if (((StreamingEvaluationContext) context).hasStream(transform)) {
+          JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
+              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
+              ((StreamingEvaluationContext) context).getStream(transform);
+
+          ((StreamingEvaluationContext) context).setStream(transform, dStream
+              .transform(new RDDTransform<>((StreamingEvaluationContext) context,
+              rddEvaluator, transform)));
+        } else {
+          // if the transformation requires direct access to RDD (not in stream)
+          // this is used for "fake" transformations like with DataflowAssert
+          rddEvaluator.evaluate(transform, context);
+        }
+      }
+    };
+  }
+
+  /**
+   * RDD transform function If the transformation function doesn't have an input, create a fake one
+   * as an empty RDD.
+   *
+   * @param <PT> PTransform type
+   */
+  private static final class RDDTransform<PT extends PTransform<?, ?>>
+      implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> {
+
+    private final StreamingEvaluationContext context;
+    private final AppliedPTransform<?, ?, ?> appliedPTransform;
+    private final TransformEvaluator rddEvaluator;
+    private final PT transform;
+
+
+    private RDDTransform(StreamingEvaluationContext context, TransformEvaluator rddEvaluator,
+        PT transform) {
+      this.context = context;
+      this.appliedPTransform = context.getCurrentTransform();
+      this.rddEvaluator = rddEvaluator;
+      this.transform = transform;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public JavaRDD<WindowedValue<Object>>
+        call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
+      AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
+      context.setCurrentTransform(appliedPTransform);
+      context.setInputRDD(transform, rdd);
+      rddEvaluator.evaluate(transform, context);
+      if (!context.hasOutputRDD(transform)) {
+        // fake RDD as output
+        context.setOutputRDD(transform,
+            context.getSparkContext().<WindowedValue<Object>>emptyRDD());
+      }
+      JavaRDD<WindowedValue<Object>> outRDD =
+          (JavaRDD<WindowedValue<Object>>) context.getOutputRDD(transform);
+      context.setCurrentTransform(existingAPT);
+      return outRDD;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD(
+      final SparkPipelineTranslator rddTranslator) {
+    return new TransformEvaluator<PT>() {
+      @Override
+      public void evaluate(final PT transform,
+                           final EvaluationContext context) {
+        final TransformEvaluator rddEvaluator =
+            rddTranslator.translate((Class<? extends PTransform<?, ?>>) transform.getClass());
+
+        if (((StreamingEvaluationContext) context).hasStream(transform)) {
+          JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
+              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>) (
+              (StreamingEvaluationContext) context).getStream(transform);
+
+          dStream.foreachRDD(new RDDOutputOperator<>((StreamingEvaluationContext) context,
+              rddEvaluator, transform));
+        } else {
+          rddEvaluator.evaluate(transform, context);
+        }
+      }
+    };
+  }
+
+  /**
+   * RDD output function.
+   *
+   * @param <PT> PTransform type
+   */
+  private static final class RDDOutputOperator<PT extends PTransform<?, ?>>
+      implements Function<JavaRDD<WindowedValue<Object>>, Void> {
+
+    private final StreamingEvaluationContext context;
+    private final AppliedPTransform<?, ?, ?> appliedPTransform;
+    private final TransformEvaluator rddEvaluator;
+    private final PT transform;
+
+
+    private RDDOutputOperator(StreamingEvaluationContext context, TransformEvaluator rddEvaluator,
+        PT transform) {
+      this.context = context;
+      this.appliedPTransform = context.getCurrentTransform();
+      this.rddEvaluator = rddEvaluator;
+      this.transform = transform;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Void call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
+      AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
+      context.setCurrentTransform(appliedPTransform);
+      context.setInputRDD(transform, rdd);
+      rddEvaluator.evaluate(transform, context);
+      context.setCurrentTransform(existingAPT);
+      return null;
+    }
+  }
+
+  static final TransformTranslator.FieldGetter WINDOW_FG =
+      new TransformTranslator.FieldGetter(Window.Bound.class);
+
+  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
+    return new TransformEvaluator<Window.Bound<T>>() {
+      @Override
+      public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
+        //--- first we apply windowing to the stream
+        WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
+        @SuppressWarnings("unchecked")
+        JavaDStream<WindowedValue<T>> dStream =
+            (JavaDStream<WindowedValue<T>>)
+            ((StreamingEvaluationContext) context).getStream(transform);
+        if (windowFn instanceof FixedWindows) {
+          Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize()
+              .getMillis());
+          ((StreamingEvaluationContext) context)
+              .setStream(transform, dStream.window(windowDuration));
+        } else if (windowFn instanceof SlidingWindows) {
+          Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize()
+              .getMillis());
+          Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod()
+              .getMillis());
+          ((StreamingEvaluationContext) context)
+              .setStream(transform, dStream.window(windowDuration, slideDuration));
+        }
+        //--- then we apply windowing to the elements
+        DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
+        DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
+            ((StreamingEvaluationContext)context).getRuntimeContext(), null);
+        @SuppressWarnings("unchecked")
+        JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
+            (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
+            ((StreamingEvaluationContext) context).getStream(transform);
+        //noinspection unchecked
+        ((StreamingEvaluationContext) context).setStream(transform,
+             dstream.mapPartitions(dofn));
+      }
+    };
+  }
+
+  private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
+      .newHashMap();
+
+  static {
+    EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
+    EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue());
+    EVALUATORS.put(Create.Values.class, create());
+    EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka());
+    EVALUATORS.put(Window.Bound.class, window());
+  }
+
+  private static final Set<Class<? extends PTransform>> UNSUPPORTTED_EVALUATORS = Sets
+      .newHashSet();
+
+  static {
+    //TODO - add support for the following
+    UNSUPPORTTED_EVALUATORS.add(TextIO.Read.Bound.class);
+    UNSUPPORTTED_EVALUATORS.add(TextIO.Write.Bound.class);
+    UNSUPPORTTED_EVALUATORS.add(AvroIO.Read.Bound.class);
+    UNSUPPORTTED_EVALUATORS.add(AvroIO.Write.Bound.class);
+    UNSUPPORTTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
+    UNSUPPORTTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
+    UNSUPPORTTED_EVALUATORS.add(Flatten.FlattenPCollectionList.class);
+  }
+
+  private static <PT extends PTransform<?, ?>> boolean hasTransformEvaluator(Class<PT> clazz) {
+    return EVALUATORS.containsKey(clazz);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
+      getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) {
+    TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
+    if (transform == null) {
+      if (UNSUPPORTTED_EVALUATORS.contains(clazz)) {
+        throw new UnsupportedOperationException("Dataflow transformation " + clazz
+          .getCanonicalName()
+          + " is currently unsupported by the Spark streaming pipeline");
+      }
+      // DStream transformations will transform an RDD into another RDD
+      // Actions will create output
+      // In Dataflow it depends on the PTranform's Input and Output class
+      Class pTOutputClazz = getPTransformOutputClazz(clazz);
+      if (pTOutputClazz == PDone.class) {
+        return foreachRDD(rddTranslator);
+      } else {
+        return rddTransform(rddTranslator);
+      }
+    }
+    return transform;
+  }
+
+  private static <PT extends PTransform<?, ?>> Class
+      getPTransformOutputClazz(Class<PT> clazz) {
+    Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments();
+    return TypeToken.of(clazz).resolveType(types[1]).getRawType();
+  }
+
+  /**
+   * Translator matches Dataflow transformation with the appropriate Spark streaming evaluator.
+   * rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation
+   */
+  public static class Translator implements SparkPipelineTranslator {
+
+    private final SparkPipelineTranslator rddTranslator;
+
+    public Translator(SparkPipelineTranslator rddTranslator) {
+      this.rddTranslator = rddTranslator;
+    }
+
+    @Override
+    public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
+      // streaming includes rdd transformations as well
+      return hasTransformEvaluator(clazz) || rddTranslator.hasTranslation(clazz);
+    }
+
+    @Override
+    public TransformEvaluator<? extends PTransform<?, ?>> translate(
+        Class<? extends PTransform<?, ?>> clazz) {
+      return getTransformEvaluator(clazz, rddTranslator);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
new file mode 100644
index 0000000..f9b2d2b
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.dataflow.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Durations;
+
+import com.cloudera.dataflow.spark.SparkPipelineRunner;
+import com.cloudera.dataflow.spark.SparkPipelineTranslator;
+import com.cloudera.dataflow.spark.TransformTranslator;
+
+/**
+ * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing.
+ */
+public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator {
+
+  // Currently, Spark streaming recommends batches no smaller then 500 msec
+  private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500);
+
+  private boolean windowing;
+  private Duration batchDuration;
+
+  public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) {
+    super(translator);
+  }
+
+  static final TransformTranslator.FieldGetter WINDOW_FG =
+      new TransformTranslator.FieldGetter(Window.Bound.class);
+
+  // Use the smallest window (fixed or sliding) as Spark streaming's batch duration
+  protected <PT extends PTransform<? super PInput, POutput>> void
+      doVisitTransform(TransformTreeNode node) {
+    @SuppressWarnings("unchecked")
+    PT transform = (PT) node.getTransform();
+    @SuppressWarnings("unchecked")
+    Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
+    if (transformClass.isAssignableFrom(Window.Bound.class)) {
+      WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
+      if (windowFn instanceof FixedWindows) {
+        setBatchDuration(((FixedWindows) windowFn).getSize());
+      } else if (windowFn instanceof SlidingWindows) {
+        if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) {
+          throw new UnsupportedOperationException("Spark does not support window offsets");
+        }
+        // Sliding window size might as well set the batch duration. Applying the transformation
+        // will add the "slide"
+        setBatchDuration(((SlidingWindows) windowFn).getSize());
+      } else if (!(windowFn instanceof GlobalWindows)) {
+        throw new IllegalStateException("Windowing function not supported: " + windowFn);
+      }
+    }
+  }
+
+  private void setBatchDuration(org.joda.time.Duration duration) {
+    Long durationMillis = duration.getMillis();
+    // validate window size
+    if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) {
+      throw new IllegalArgumentException("Windowing of size " + durationMillis +
+          "msec is not supported!");
+    }
+    // choose the smallest duration to be Spark's batch duration, larger ones will be handled
+    // as window functions  over the batched-stream
+    if (!windowing || this.batchDuration.milliseconds() > durationMillis) {
+      this.batchDuration = Durations.milliseconds(durationMillis);
+    }
+    windowing = true;
+  }
+
+  public boolean isWindowing() {
+    return windowing;
+  }
+
+  public Duration getBatchDuration() {
+    return batchDuration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
index 045d5dd..5733a86 100644
--- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
+++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-com.cloudera.dataflow.spark.SparkPipelineOptionsRegistrar
\ No newline at end of file
+com.cloudera.dataflow.spark.SparkPipelineOptionsRegistrar
+com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptionsRegistrar
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
index 179816d..2df8493 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
@@ -65,7 +65,7 @@ public class MultiOutputWordCountTest {
 
     EvaluationResult res = SparkPipelineRunner.create().run(p);
     Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts));
-    Assert.assertEquals("and", actualLower.iterator().next().getKey());
+    Assert.assertEquals("are", actualLower.iterator().next().getKey());
     Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
     Assert.assertEquals("Here", actualUpper.iterator().next().getKey());
     Iterable<Long> actualUniqCount = res.get(unique);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
index e1d5979..ce7acda 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
@@ -58,7 +58,7 @@ public class SideEffectsTest implements Serializable {
 
       // TODO: remove the version check (and the setup and teardown methods) when we no
       // longer support Spark 1.3 or 1.4
-      String version = SparkContextFactory.getSparkContext(options.getSparkMaster()).version();
+      String version = SparkContextFactory.getSparkContext(options.getSparkMaster(), options.getAppName()).version();
       if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
         assertTrue(e.getCause() instanceof UserException);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java
index 0f6db1f..3d85f46 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java
@@ -93,7 +93,7 @@ public class SimpleWordCountTest {
     }
   }
 
-  private static class CountWords extends PTransform<PCollection<String>, PCollection<String>> {
+  public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> {
     @Override
     public PCollection<String> apply(PCollection<String> lines) {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/WindowedWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/WindowedWordCountTest.java
new file mode 100644
index 0000000..c16878e
--- /dev/null
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/WindowedWordCountTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.dataflow.spark;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import org.joda.time.Duration;
+import org.junit.Test;
+
+public class WindowedWordCountTest {
+  private static final String[] WORDS_ARRAY = {
+          "hi there", "hi", "hi sue bob",
+          "hi sue", "", "bob hi"};
+  private static final Long[] TIMESTAMPS_ARRAY = {
+          60000L, 60000L, 60000L,
+          120000L, 120000L, 120000L};
+  private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+  private static final List<Long> TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY);
+  private static final List<String> EXPECTED_COUNT_SET =
+          ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1",
+                  "hi: 2", "sue: 1", "bob: 1");
+
+  @Test
+  public void testRun() throws Exception {
+    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    options.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS))
+            .setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedWords = inputWords
+            .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
+
+    PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+
+    DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+
+    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    res.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java
new file mode 100644
index 0000000..8778e00
--- /dev/null
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableMap;
+import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableSet;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import com.cloudera.dataflow.io.KafkaIO;
+import com.cloudera.dataflow.spark.EvaluationResult;
+import com.cloudera.dataflow.spark.SparkPipelineRunner;
+import com.cloudera.dataflow.spark.streaming.utils.DataflowAssertStreaming;
+import com.cloudera.dataflow.spark.streaming.utils.EmbeddedKafkaCluster;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import kafka.serializer.StringDecoder;
+
+/**
+ * Test Kafka as input.
+ */
+public class KafkaStreamingTest {
+  private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER =
+          new EmbeddedKafkaCluster.EmbeddedZookeeper(17001);
+  private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
+          new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(),
+                  new Properties(), Collections.singletonList(6667));
+  private static final String TOPIC = "kafka_dataflow_test_topic";
+  private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
+          "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
+  );
+  private static final Set<String> EXPECTED = ImmutableSet.of(
+          "k1,v1", "k2,v2", "k3,v3", "k4,v4"
+  );
+  private final static long TEST_TIMEOUT_MSEC = 1000L;
+
+  @BeforeClass
+  public static void init() throws IOException, InterruptedException {
+    EMBEDDED_ZOOKEEPER.startup();
+    EMBEDDED_KAFKA_CLUSTER.startup();
+
+    // write to Kafka
+    Properties producerProps = new Properties();
+    producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
+    producerProps.put("request.required.acks", 1);
+    producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList());
+    Serializer<String> stringSerializer = new StringSerializer();
+    @SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer =
+            new KafkaProducer(producerProps, stringSerializer, stringSerializer);
+    for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) {
+      kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue()));
+    }
+    kafkaProducer.close();
+  }
+
+  @Test
+  public void testRun() throws Exception {
+    // test read from Kafka
+    SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
+    options.setAppName(this.getClass().getSimpleName());
+    options.setRunner(SparkPipelineRunner.class);
+    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    Pipeline p = Pipeline.create(options);
+
+    Map<String, String> kafkaParams = ImmutableMap.of(
+            "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(),
+            "auto.offset.reset", "smallest"
+    );
+
+    PCollection<KV<String, String>> kafkaInput = p.apply(KafkaIO.Read.from(StringDecoder.class,
+        StringDecoder.class, String.class, String.class, Collections.singleton(TOPIC),
+        kafkaParams));
+    PCollection<KV<String, String>> windowedWords = kafkaInput
+        .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(1))));
+
+    PCollection<String> formattedKV = windowedWords.apply(ParDo.of(new FormatKVFn()));
+
+    DataflowAssert.thatIterable(formattedKV.apply(View.<String>asIterable()))
+        .containsInAnyOrder(EXPECTED);
+
+    EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+    res.close();
+
+    DataflowAssertStreaming.assertNoFailures(res);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EMBEDDED_KAFKA_CLUSTER.shutdown();
+    EMBEDDED_ZOOKEEPER.shutdown();
+  }
+
+  private static class FormatKVFn extends DoFn<KV<String, String>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getKey() + "," + c.element().getValue());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
new file mode 100644
index 0000000..613e517
--- /dev/null
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableSet;
+
+import com.cloudera.dataflow.io.ConsoleIO;
+import com.cloudera.dataflow.io.CreateStream;
+import com.cloudera.dataflow.spark.EvaluationResult;
+import com.cloudera.dataflow.spark.SimpleWordCountTest;
+import com.cloudera.dataflow.spark.SparkPipelineRunner;
+import com.cloudera.dataflow.spark.streaming.utils.DataflowAssertStreaming;
+
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class SimpleStreamingWordCountTest {
+
+  private static final String[] WORDS_ARRAY = {
+      "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
+  private static final List<Iterable<String>> WORDS_QUEUE =
+      Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY));
+  private static final Set<String> EXPECTED_COUNT_SET =
+      ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+  final static long TEST_TIMEOUT_MSEC = 1000L;
+
+  @Test
+  public void testRun() throws Exception {
+    SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
+    options.setAppName(this.getClass().getSimpleName());
+    options.setRunner(SparkPipelineRunner.class);
+    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<String> inputWords =
+        p.apply(CreateStream.fromQueue(WORDS_QUEUE)).setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedWords = inputWords
+        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+
+    PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+
+    DataflowAssert.thatIterable(output.apply(View.<String>asIterable()))
+        .containsInAnyOrder(EXPECTED_COUNT_SET);
+
+    EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+    res.close();
+
+    DataflowAssertStreaming.assertNoFailures(res);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/DataflowAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/DataflowAssertStreaming.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/DataflowAssertStreaming.java
new file mode 100644
index 0000000..c0c5976
--- /dev/null
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/DataflowAssertStreaming.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark.streaming.utils;
+
+import com.cloudera.dataflow.spark.EvaluationResult;
+
+import org.junit.Assert;
+
+/**
+ * Since DataflowAssert doesn't propagate assert exceptions, use Aggregators to assert streaming
+ * success/failure counters.
+ */
+public final class DataflowAssertStreaming {
+  /**
+   * Copied aggregator names from {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert}
+   */
+  static final String SUCCESS_COUNTER = "DataflowAssertSuccess";
+  static final String FAILURE_COUNTER = "DataflowAssertFailure";
+
+  private DataflowAssertStreaming() {
+  }
+
+  public static void assertNoFailures(EvaluationResult res) {
+    int failures = res.getAggregatorValue(FAILURE_COUNTER, Integer.class);
+    Assert.assertEquals("Found " + failures + " failures, see the log for details", 0, failures);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java
new file mode 100644
index 0000000..6daae54
--- /dev/null
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java
@@ -0,0 +1,315 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark.streaming.utils;
+
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+
+/**
+ * https://gist.github.com/fjavieralba/7930018
+ */
+public class EmbeddedKafkaCluster {
+  private final List<Integer> ports;
+  private final String zkConnection;
+  private final Properties baseProperties;
+
+  private final String brokerList;
+
+  private final List<KafkaServer> brokers;
+  private final List<File> logDirs;
+
+  public EmbeddedKafkaCluster(String zkConnection) {
+    this(zkConnection, new Properties());
+  }
+
+  public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties) {
+    this(zkConnection, baseProperties, Collections.singletonList(-1));
+  }
+
+  public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties, List<Integer> ports) {
+    this.zkConnection = zkConnection;
+    this.ports = resolvePorts(ports);
+    this.baseProperties = baseProperties;
+
+    this.brokers = new ArrayList<KafkaServer>();
+    this.logDirs = new ArrayList<File>();
+
+    this.brokerList = constructBrokerList(this.ports);
+  }
+
+  private List<Integer> resolvePorts(List<Integer> ports) {
+    List<Integer> resolvedPorts = new ArrayList<Integer>();
+    for (Integer port : ports) {
+      resolvedPorts.add(resolvePort(port));
+    }
+    return resolvedPorts;
+  }
+
+  private int resolvePort(int port) {
+    if (port == -1) {
+      return TestUtils.getAvailablePort();
+    }
+    return port;
+  }
+
+  private String constructBrokerList(List<Integer> ports) {
+    StringBuilder sb = new StringBuilder();
+    for (Integer port : ports) {
+      if (sb.length() > 0) {
+        sb.append(",");
+      }
+      sb.append("localhost:").append(port);
+    }
+    return sb.toString();
+  }
+
+  public void startup() {
+    for (int i = 0; i < ports.size(); i++) {
+      Integer port = ports.get(i);
+      File logDir = TestUtils.constructTempDir("kafka-local");
+
+      Properties properties = new Properties();
+      properties.putAll(baseProperties);
+      properties.setProperty("zookeeper.connect", zkConnection);
+      properties.setProperty("broker.id", String.valueOf(i + 1));
+      properties.setProperty("host.name", "localhost");
+      properties.setProperty("port", Integer.toString(port));
+      properties.setProperty("log.dir", logDir.getAbsolutePath());
+      properties.setProperty("log.flush.interval.messages", String.valueOf(1));
+
+      KafkaServer broker = startBroker(properties);
+
+      brokers.add(broker);
+      logDirs.add(logDir);
+    }
+  }
+
+
+  private KafkaServer startBroker(Properties props) {
+    KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime());
+    server.startup();
+    return server;
+  }
+
+  public Properties getProps() {
+    Properties props = new Properties();
+    props.putAll(baseProperties);
+    props.put("metadata.broker.list", brokerList);
+    props.put("zookeeper.connect", zkConnection);
+    return props;
+  }
+
+  public String getBrokerList() {
+    return brokerList;
+  }
+
+  public List<Integer> getPorts() {
+    return ports;
+  }
+
+  public String getZkConnection() {
+    return zkConnection;
+  }
+
+  public void shutdown() {
+    for (KafkaServer broker : brokers) {
+      try {
+        broker.shutdown();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    for (File logDir : logDirs) {
+      try {
+        TestUtils.deleteFile(logDir);
+      } catch (FileNotFoundException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("EmbeddedKafkaCluster{");
+    sb.append("brokerList='").append(brokerList).append('\'');
+    sb.append('}');
+    return sb.toString();
+  }
+
+  public static class EmbeddedZookeeper {
+    private int port = -1;
+    private int tickTime = 500;
+
+    private ServerCnxnFactory factory;
+    private File snapshotDir;
+    private File logDir;
+
+    public EmbeddedZookeeper() {
+      this(-1);
+    }
+
+    public EmbeddedZookeeper(int port) {
+      this(port, 500);
+    }
+
+    public EmbeddedZookeeper(int port, int tickTime) {
+      this.port = resolvePort(port);
+      this.tickTime = tickTime;
+    }
+
+    private int resolvePort(int port) {
+      if (port == -1) {
+        return TestUtils.getAvailablePort();
+      }
+      return port;
+    }
+
+    public void startup() throws IOException {
+      if (this.port == -1) {
+        this.port = TestUtils.getAvailablePort();
+      }
+      this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port),
+              1024);
+      this.snapshotDir = TestUtils.constructTempDir("embeeded-zk/snapshot");
+      this.logDir = TestUtils.constructTempDir("embeeded-zk/log");
+
+      try {
+        factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+
+    public void shutdown() {
+      factory.shutdown();
+      try {
+        TestUtils.deleteFile(snapshotDir);
+      } catch (FileNotFoundException e) {
+        // ignore
+      }
+      try {
+        TestUtils.deleteFile(logDir);
+      } catch (FileNotFoundException e) {
+        // ignore
+      }
+    }
+
+    public String getConnection() {
+      return "localhost:" + port;
+    }
+
+    public void setPort(int port) {
+      this.port = port;
+    }
+
+    public void setTickTime(int tickTime) {
+      this.tickTime = tickTime;
+    }
+
+    public int getPort() {
+      return port;
+    }
+
+    public int getTickTime() {
+      return tickTime;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder("EmbeddedZookeeper{");
+      sb.append("connection=").append(getConnection());
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  static class SystemTime implements Time {
+    public long milliseconds() {
+      return System.currentTimeMillis();
+    }
+
+    public long nanoseconds() {
+      return System.nanoTime();
+    }
+
+    public void sleep(long ms) {
+      try {
+        Thread.sleep(ms);
+      } catch (InterruptedException e) {
+        // Ignore
+      }
+    }
+  }
+
+  static class TestUtils {
+    private static final Random RANDOM = new Random();
+
+    private TestUtils() {
+    }
+
+    public static File constructTempDir(String dirPrefix) {
+      File file = new File(System.getProperty("java.io.tmpdir"), dirPrefix + RANDOM.nextInt
+              (10000000));
+      if (!file.mkdirs()) {
+        throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
+      }
+      file.deleteOnExit();
+      return file;
+    }
+
+    public static int getAvailablePort() {
+      try {
+        ServerSocket socket = new ServerSocket(0);
+        try {
+          return socket.getLocalPort();
+        } finally {
+          socket.close();
+        }
+      } catch (IOException e) {
+        throw new IllegalStateException("Cannot find available port: " + e.getMessage(), e);
+      }
+    }
+
+    public static boolean deleteFile(File path) throws FileNotFoundException {
+      if (!path.exists()) {
+        throw new FileNotFoundException(path.getAbsolutePath());
+      }
+      boolean ret = true;
+      if (path.isDirectory()) {
+        for (File f : path.listFiles()) {
+          ret = ret && deleteFile(f);
+        }
+      }
+      return ret && path.delete();
+    }
+  }
+}


Mime
View raw message