Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CF9B318F45 for ; Thu, 10 Mar 2016 20:58:44 +0000 (UTC) Received: (qmail 55508 invoked by uid 500); 10 Mar 2016 20:58:44 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 55459 invoked by uid 500); 10 Mar 2016 20:58:44 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 55450 invoked by uid 99); 10 Mar 2016 20:58:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Mar 2016 20:58:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 3845318053B for ; Thu, 10 Mar 2016 20:58:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.539 X-Spam-Level: X-Spam-Status: No, score=-3.539 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329, T_FILL_THIS_FORM_SHORT=0.01] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id fAfUuBsCaBZ3 for ; Thu, 10 Mar 2016 20:58:33 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id BD03A60E29 for ; Thu, 10 Mar 2016 20:58:28 +0000 (UTC) Received: (qmail 51014 invoked by uid 99); 10 Mar 2016 20:58:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Mar 2016 20:58:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 48285DFA6F; Thu, 10 Mar 2016 20:58:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amitsela@apache.org To: commits@beam.incubator.apache.org Date: Thu, 10 Mar 2016 20:59:09 -0000 Message-Id: In-Reply-To: <950b2164e3574071865aa1dbd02b56fe@git.apache.org> References: <950b2164e3574071865aa1dbd02b56fe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [44/50] [abbrv] incubator-beam git commit: Add spark-streaming support to spark-dataflow http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java new file mode 100644 index 0000000..57253f0 --- /dev/null +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptions.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package com.cloudera.dataflow.spark.streaming; + +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; + +import com.cloudera.dataflow.spark.SparkPipelineOptions; + +/** + * Options used to configure Spark streaming. + */ +public interface SparkStreamingPipelineOptions extends SparkPipelineOptions { + @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until " + + "execution is stopped") + @Default.Long(-1) + Long getTimeout(); + + void setTimeout(Long batchInterval); + + @Override + @Default.Boolean(true) + boolean isStreaming(); + + @Override + @Default.String("spark streaming dataflow pipeline job") + String getAppName(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java new file mode 100644 index 0000000..3b568af --- /dev/null +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsFactory.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package com.cloudera.dataflow.spark.streaming; + +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; + +public final class SparkStreamingPipelineOptionsFactory { + + private SparkStreamingPipelineOptionsFactory() { + } + + public static SparkStreamingPipelineOptions create() { + return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java new file mode 100644 index 0000000..01c4375 --- /dev/null +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package com.cloudera.dataflow.spark.streaming; + +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar; +import com.google.common.collect.ImmutableList; + +public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar { + + @Override + public Iterable> getPipelineOptions() { + return ImmutableList.>of(SparkStreamingPipelineOptions + .class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java new file mode 100644 index 0000000..5e1b42d --- /dev/null +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package com.cloudera.dataflow.spark.streaming; + + +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; +import com.google.cloud.dataflow.sdk.values.PValue; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaRDDLike; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaDStreamLike; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + + +import com.cloudera.dataflow.spark.EvaluationContext; +import com.cloudera.dataflow.spark.SparkRuntimeContext; + +/** + * Streaming evaluation context helps to handle streaming. + */ +public class StreamingEvaluationContext extends EvaluationContext { + + private final JavaStreamingContext jssc; + private final long timeout; + private final Map> pstreams = new LinkedHashMap<>(); + private final Set> leafStreams = new LinkedHashSet<>(); + + public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline, + JavaStreamingContext jssc, long timeout) { + super(jsc, pipeline); + this.jssc = jssc; + this.timeout = timeout; + } + + /** + * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for + * testing. + */ + private class DStreamHolder { + + private Iterable> values; + private Coder coder; + private JavaDStream> dStream; + + public DStreamHolder(Iterable> values, Coder coder) { + this.values = values; + this.coder = coder; + } + + public DStreamHolder(JavaDStream> dStream) { + this.dStream = dStream; + } + + @SuppressWarnings("unchecked") + public JavaDStream> getDStream() { + if (dStream == null) { + // create the DStream from values + Queue>> rddQueue = new LinkedBlockingQueue<>(); + for (Iterable v : values) { + setOutputRDDFromValues(currentTransform.getTransform(), v, coder); + rddQueue.offer((JavaRDD>) getOutputRDD(currentTransform.getTransform())); + } + // create dstream from queue, one at a time, no defaults + // mainly for unit test so no reason to have this configurable + dStream = jssc.queueStream(rddQueue, true); + } + return dStream; + } + } + + public void setDStreamFromQueue(PTransform transform, Iterable> values, + Coder coder) { + pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder)); + } + + public , R>> + void setStream(PTransform transform, JavaDStreamLike, ?, R> dStream) { + PValue pvalue = (PValue) getOutput(transform); + @SuppressWarnings("unchecked") + DStreamHolder dStreamHolder = new DStreamHolder((JavaDStream) dStream); + pstreams.put(pvalue, dStreamHolder); + leafStreams.add(dStreamHolder); + } + + boolean hasStream(PTransform transform) { + PValue pvalue = (PValue) getInput(transform); + return pstreams.containsKey(pvalue); + } + + public JavaDStreamLike getStream(PTransform transform) { + PValue pvalue = (PValue) getInput(transform); + DStreamHolder dStreamHolder = pstreams.get(pvalue); + JavaDStreamLike dStream = dStreamHolder.getDStream(); + leafStreams.remove(dStreamHolder); + return dStream; + } + + // used to set the RDD from the DStream in the RDDHolder for transformation + public void setInputRDD(PTransform transform, + JavaRDDLike, ?> rdd) { + setRDD((PValue) getInput(transform), rdd); + } + + // used to get the RDD transformation output and use it as the DStream transformation output + public JavaRDDLike getOutputRDD(PTransform transform) { + return getRDD((PValue) getOutput(transform)); + } + + public JavaStreamingContext getStreamingContext() { + return jssc; + } + + @Override + protected void computeOutputs() { + for (DStreamHolder streamHolder : leafStreams) { + @SuppressWarnings("unchecked") + JavaDStream> stream = (JavaDStream) streamHolder.getDStream(); + stream.foreachRDD(new Function>, Void>() { + @Override + public Void call(JavaRDD> rdd) throws Exception { + rdd.rdd().cache(); + rdd.count(); + return null; + } + }); // force a DStream action + } + } + + @Override + public void close() { + if (timeout > 0) { + jssc.awaitTerminationOrTimeout(timeout); + } else { + jssc.awaitTermination(); + } + //TODO: stop gracefully ? + jssc.stop(false, false); + state = State.DONE; + super.close(); + } + + private State state = State.RUNNING; + + @Override + public State getState() { + return state; + } + + //---------------- override in order to expose in package + @Override + protected O getOutput(PTransform transform) { + return super.getOutput(transform); + } + + @Override + protected JavaSparkContext getSparkContext() { + return super.getSparkContext(); + } + + @Override + protected SparkRuntimeContext getRuntimeContext() { + return super.getRuntimeContext(); + } + + @Override + protected void setCurrentTransform(AppliedPTransform transform) { + super.setCurrentTransform(transform); + } + + @Override + protected AppliedPTransform getCurrentTransform() { + return super.getCurrentTransform(); + } + + @Override + protected void setOutputRDD(PTransform transform, + JavaRDDLike, ?> rdd) { + super.setOutputRDD(transform, rdd); + } + + @Override + protected void setOutputRDDFromValues(PTransform transform, Iterable values, + Coder coder) { + super.setOutputRDDFromValues(transform, values, coder); + } + + @Override + protected boolean hasOutputRDD(PTransform transform) { + return super.hasOutputRDD(transform); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java new file mode 100644 index 0000000..20ee88a --- /dev/null +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java @@ -0,0 +1,409 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package com.cloudera.dataflow.spark.streaming; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import com.google.api.client.util.Maps; +import com.google.api.client.util.Sets; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.VoidCoder; +import com.google.cloud.dataflow.sdk.io.AvroIO; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.repackaged.com.google.common.reflect.TypeToken; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Flatten; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PDone; + +import kafka.serializer.Decoder; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaDStreamLike; +import org.apache.spark.streaming.api.java.JavaPairInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; + +import scala.Tuple2; + +import com.cloudera.dataflow.hadoop.HadoopIO; +import com.cloudera.dataflow.io.ConsoleIO; +import com.cloudera.dataflow.io.CreateStream; +import com.cloudera.dataflow.io.KafkaIO; +import com.cloudera.dataflow.spark.DoFnFunction; +import com.cloudera.dataflow.spark.EvaluationContext; +import com.cloudera.dataflow.spark.SparkPipelineTranslator; +import com.cloudera.dataflow.spark.TransformEvaluator; +import com.cloudera.dataflow.spark.TransformTranslator; +import com.cloudera.dataflow.spark.WindowingHelpers; + +/** + * Supports translation between a DataFlow transform, and Spark's operations on DStreams. + */ +public final class StreamingTransformTranslator { + + private StreamingTransformTranslator() { + } + + private static TransformEvaluator> print() { + return new TransformEvaluator>() { + @Override + public void evaluate(ConsoleIO.Write.Unbound transform, EvaluationContext context) { + @SuppressWarnings("unchecked") + JavaDStreamLike, ?, JavaRDD>> dstream = + (JavaDStreamLike, ?, JavaRDD>>) + ((StreamingEvaluationContext) context).getStream(transform); + dstream.map(WindowingHelpers.unwindowFunction()) + .print(transform.getNum()); + } + }; + } + + private static TransformEvaluator> kafka() { + return new TransformEvaluator>() { + @Override + public void evaluate(KafkaIO.Read.Unbound transform, EvaluationContext context) { + JavaStreamingContext jssc = ((StreamingEvaluationContext) context).getStreamingContext(); + Class keyClazz = transform.getKeyClass(); + Class valueClazz = transform.getValueClass(); + Class> keyDecoderClazz = transform.getKeyDecoderClass(); + Class> valueDecoderClazz = transform.getValueDecoderClass(); + Map kafkaParams = transform.getKafkaParams(); + Set topics = transform.getTopics(); + JavaPairInputDStream inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz, + valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics); + JavaDStream>> inputStream = + inputPairStream.map(new Function, KV>() { + @Override + public KV call(Tuple2 t2) throws Exception { + return KV.of(t2._1(), t2._2()); + } + }).map(WindowingHelpers.>windowFunction()); + ((StreamingEvaluationContext) context).setStream(transform, inputStream); + } + }; + } + + private static TransformEvaluator> + create() { + return new TransformEvaluator>() { + @SuppressWarnings("unchecked") + @Override + public void evaluate(com.google.cloud.dataflow.sdk.transforms.Create.Values + transform, EvaluationContext context) { + Iterable elems = transform.getElements(); + Coder coder = ((StreamingEvaluationContext) context).getOutput(transform) + .getCoder(); + if (coder != VoidCoder.of()) { + // actual create + ((StreamingEvaluationContext) context).setOutputRDDFromValues(transform, + elems, coder); + } else { + // fake create as an input + // creates a stream with a single batch containing a single null element + // to invoke following transformations once + // to support DataflowAssert + ((StreamingEvaluationContext) context).setDStreamFromQueue(transform, + Collections.>singletonList(Collections.singletonList((Void) null)), + (Coder) coder); + } + } + }; + } + + private static TransformEvaluator> createFromQueue() { + return new TransformEvaluator>() { + @Override + public void evaluate(CreateStream.QueuedValues transform, EvaluationContext + context) { + Iterable> values = transform.getQueuedValues(); + Coder coder = ((StreamingEvaluationContext) context).getOutput(transform) + .getCoder(); + ((StreamingEvaluationContext) context).setDStreamFromQueue(transform, values, + coder); + } + }; + } + + private static > TransformEvaluator rddTransform( + final SparkPipelineTranslator rddTranslator) { + return new TransformEvaluator() { + @SuppressWarnings("unchecked") + @Override + public void evaluate(final PT transform, + final EvaluationContext context) { + final TransformEvaluator rddEvaluator = + rddTranslator.translate((Class>) transform.getClass()); + + if (((StreamingEvaluationContext) context).hasStream(transform)) { + JavaDStreamLike, ?, JavaRDD>> dStream = + (JavaDStreamLike, ?, JavaRDD>>) + ((StreamingEvaluationContext) context).getStream(transform); + + ((StreamingEvaluationContext) context).setStream(transform, dStream + .transform(new RDDTransform<>((StreamingEvaluationContext) context, + rddEvaluator, transform))); + } else { + // if the transformation requires direct access to RDD (not in stream) + // this is used for "fake" transformations like with DataflowAssert + rddEvaluator.evaluate(transform, context); + } + } + }; + } + + /** + * RDD transform function If the transformation function doesn't have an input, create a fake one + * as an empty RDD. + * + * @param PTransform type + */ + private static final class RDDTransform> + implements Function>, JavaRDD>> { + + private final StreamingEvaluationContext context; + private final AppliedPTransform appliedPTransform; + private final TransformEvaluator rddEvaluator; + private final PT transform; + + + private RDDTransform(StreamingEvaluationContext context, TransformEvaluator rddEvaluator, + PT transform) { + this.context = context; + this.appliedPTransform = context.getCurrentTransform(); + this.rddEvaluator = rddEvaluator; + this.transform = transform; + } + + @Override + @SuppressWarnings("unchecked") + public JavaRDD> + call(JavaRDD> rdd) throws Exception { + AppliedPTransform existingAPT = context.getCurrentTransform(); + context.setCurrentTransform(appliedPTransform); + context.setInputRDD(transform, rdd); + rddEvaluator.evaluate(transform, context); + if (!context.hasOutputRDD(transform)) { + // fake RDD as output + context.setOutputRDD(transform, + context.getSparkContext().>emptyRDD()); + } + JavaRDD> outRDD = + (JavaRDD>) context.getOutputRDD(transform); + context.setCurrentTransform(existingAPT); + return outRDD; + } + } + + @SuppressWarnings("unchecked") + private static > TransformEvaluator foreachRDD( + final SparkPipelineTranslator rddTranslator) { + return new TransformEvaluator() { + @Override + public void evaluate(final PT transform, + final EvaluationContext context) { + final TransformEvaluator rddEvaluator = + rddTranslator.translate((Class>) transform.getClass()); + + if (((StreamingEvaluationContext) context).hasStream(transform)) { + JavaDStreamLike, ?, JavaRDD>> dStream = + (JavaDStreamLike, ?, JavaRDD>>) ( + (StreamingEvaluationContext) context).getStream(transform); + + dStream.foreachRDD(new RDDOutputOperator<>((StreamingEvaluationContext) context, + rddEvaluator, transform)); + } else { + rddEvaluator.evaluate(transform, context); + } + } + }; + } + + /** + * RDD output function. + * + * @param PTransform type + */ + private static final class RDDOutputOperator> + implements Function>, Void> { + + private final StreamingEvaluationContext context; + private final AppliedPTransform appliedPTransform; + private final TransformEvaluator rddEvaluator; + private final PT transform; + + + private RDDOutputOperator(StreamingEvaluationContext context, TransformEvaluator rddEvaluator, + PT transform) { + this.context = context; + this.appliedPTransform = context.getCurrentTransform(); + this.rddEvaluator = rddEvaluator; + this.transform = transform; + } + + @Override + @SuppressWarnings("unchecked") + public Void call(JavaRDD> rdd) throws Exception { + AppliedPTransform existingAPT = context.getCurrentTransform(); + context.setCurrentTransform(appliedPTransform); + context.setInputRDD(transform, rdd); + rddEvaluator.evaluate(transform, context); + context.setCurrentTransform(existingAPT); + return null; + } + } + + static final TransformTranslator.FieldGetter WINDOW_FG = + new TransformTranslator.FieldGetter(Window.Bound.class); + + private static TransformEvaluator> window() { + return new TransformEvaluator>() { + @Override + public void evaluate(Window.Bound transform, EvaluationContext context) { + //--- first we apply windowing to the stream + WindowFn windowFn = WINDOW_FG.get("windowFn", transform); + @SuppressWarnings("unchecked") + JavaDStream> dStream = + (JavaDStream>) + ((StreamingEvaluationContext) context).getStream(transform); + if (windowFn instanceof FixedWindows) { + Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize() + .getMillis()); + ((StreamingEvaluationContext) context) + .setStream(transform, dStream.window(windowDuration)); + } else if (windowFn instanceof SlidingWindows) { + Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize() + .getMillis()); + Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod() + .getMillis()); + ((StreamingEvaluationContext) context) + .setStream(transform, dStream.window(windowDuration, slideDuration)); + } + //--- then we apply windowing to the elements + DoFn addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); + DoFnFunction dofn = new DoFnFunction<>(addWindowsDoFn, + ((StreamingEvaluationContext)context).getRuntimeContext(), null); + @SuppressWarnings("unchecked") + JavaDStreamLike, ?, JavaRDD>> dstream = + (JavaDStreamLike, ?, JavaRDD>>) + ((StreamingEvaluationContext) context).getStream(transform); + //noinspection unchecked + ((StreamingEvaluationContext) context).setStream(transform, + dstream.mapPartitions(dofn)); + } + }; + } + + private static final Map, TransformEvaluator> EVALUATORS = Maps + .newHashMap(); + + static { + EVALUATORS.put(ConsoleIO.Write.Unbound.class, print()); + EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue()); + EVALUATORS.put(Create.Values.class, create()); + EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka()); + EVALUATORS.put(Window.Bound.class, window()); + } + + private static final Set> UNSUPPORTTED_EVALUATORS = Sets + .newHashSet(); + + static { + //TODO - add support for the following + UNSUPPORTTED_EVALUATORS.add(TextIO.Read.Bound.class); + UNSUPPORTTED_EVALUATORS.add(TextIO.Write.Bound.class); + UNSUPPORTTED_EVALUATORS.add(AvroIO.Read.Bound.class); + UNSUPPORTTED_EVALUATORS.add(AvroIO.Write.Bound.class); + UNSUPPORTTED_EVALUATORS.add(HadoopIO.Read.Bound.class); + UNSUPPORTTED_EVALUATORS.add(HadoopIO.Write.Bound.class); + UNSUPPORTTED_EVALUATORS.add(Flatten.FlattenPCollectionList.class); + } + + private static > boolean hasTransformEvaluator(Class clazz) { + return EVALUATORS.containsKey(clazz); + } + + @SuppressWarnings("unchecked") + private static > TransformEvaluator + getTransformEvaluator(Class clazz, SparkPipelineTranslator rddTranslator) { + TransformEvaluator transform = (TransformEvaluator) EVALUATORS.get(clazz); + if (transform == null) { + if (UNSUPPORTTED_EVALUATORS.contains(clazz)) { + throw new UnsupportedOperationException("Dataflow transformation " + clazz + .getCanonicalName() + + " is currently unsupported by the Spark streaming pipeline"); + } + // DStream transformations will transform an RDD into another RDD + // Actions will create output + // In Dataflow it depends on the PTranform's Input and Output class + Class pTOutputClazz = getPTransformOutputClazz(clazz); + if (pTOutputClazz == PDone.class) { + return foreachRDD(rddTranslator); + } else { + return rddTransform(rddTranslator); + } + } + return transform; + } + + private static > Class + getPTransformOutputClazz(Class clazz) { + Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments(); + return TypeToken.of(clazz).resolveType(types[1]).getRawType(); + } + + /** + * Translator matches Dataflow transformation with the appropriate Spark streaming evaluator. + * rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation + */ + public static class Translator implements SparkPipelineTranslator { + + private final SparkPipelineTranslator rddTranslator; + + public Translator(SparkPipelineTranslator rddTranslator) { + this.rddTranslator = rddTranslator; + } + + @Override + public boolean hasTranslation(Class> clazz) { + // streaming includes rdd transformations as well + return hasTransformEvaluator(clazz) || rddTranslator.hasTranslation(clazz); + } + + @Override + public TransformEvaluator> translate( + Class> clazz) { + return getTransformEvaluator(clazz, rddTranslator); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java new file mode 100644 index 0000000..f9b2d2b --- /dev/null +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package com.cloudera.dataflow.spark.streaming; + +import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; + +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Durations; + +import com.cloudera.dataflow.spark.SparkPipelineRunner; +import com.cloudera.dataflow.spark.SparkPipelineTranslator; +import com.cloudera.dataflow.spark.TransformTranslator; + +/** + * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing. + */ +public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator { + + // Currently, Spark streaming recommends batches no smaller then 500 msec + private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500); + + private boolean windowing; + private Duration batchDuration; + + public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) { + super(translator); + } + + static final TransformTranslator.FieldGetter WINDOW_FG = + new TransformTranslator.FieldGetter(Window.Bound.class); + + // Use the smallest window (fixed or sliding) as Spark streaming's batch duration + protected > void + doVisitTransform(TransformTreeNode node) { + @SuppressWarnings("unchecked") + PT transform = (PT) node.getTransform(); + @SuppressWarnings("unchecked") + Class transformClass = (Class) (Class) transform.getClass(); + if (transformClass.isAssignableFrom(Window.Bound.class)) { + WindowFn windowFn = WINDOW_FG.get("windowFn", transform); + if (windowFn instanceof FixedWindows) { + setBatchDuration(((FixedWindows) windowFn).getSize()); + } else if (windowFn instanceof SlidingWindows) { + if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) { + throw new UnsupportedOperationException("Spark does not support window offsets"); + } + // Sliding window size might as well set the batch duration. Applying the transformation + // will add the "slide" + setBatchDuration(((SlidingWindows) windowFn).getSize()); + } else if (!(windowFn instanceof GlobalWindows)) { + throw new IllegalStateException("Windowing function not supported: " + windowFn); + } + } + } + + private void setBatchDuration(org.joda.time.Duration duration) { + Long durationMillis = duration.getMillis(); + // validate window size + if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) { + throw new IllegalArgumentException("Windowing of size " + durationMillis + + "msec is not supported!"); + } + // choose the smallest duration to be Spark's batch duration, larger ones will be handled + // as window functions over the batched-stream + if (!windowing || this.batchDuration.milliseconds() > durationMillis) { + this.batchDuration = Durations.milliseconds(durationMillis); + } + windowing = true; + } + + public boolean isWindowing() { + return windowing; + } + + public Duration getBatchDuration() { + return batchDuration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar index 045d5dd..5733a86 100644 --- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar +++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -com.cloudera.dataflow.spark.SparkPipelineOptionsRegistrar \ No newline at end of file +com.cloudera.dataflow.spark.SparkPipelineOptionsRegistrar +com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptionsRegistrar \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java index 179816d..2df8493 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java @@ -65,7 +65,7 @@ public class MultiOutputWordCountTest { EvaluationResult res = SparkPipelineRunner.create().run(p); Iterable> actualLower = res.get(luc.get(lowerCnts)); - Assert.assertEquals("and", actualLower.iterator().next().getKey()); + Assert.assertEquals("are", actualLower.iterator().next().getKey()); Iterable> actualUpper = res.get(luc.get(upperCnts)); Assert.assertEquals("Here", actualUpper.iterator().next().getKey()); Iterable actualUniqCount = res.get(unique); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java index e1d5979..ce7acda 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java @@ -58,7 +58,7 @@ public class SideEffectsTest implements Serializable { // TODO: remove the version check (and the setup and teardown methods) when we no // longer support Spark 1.3 or 1.4 - String version = SparkContextFactory.getSparkContext(options.getSparkMaster()).version(); + String version = SparkContextFactory.getSparkContext(options.getSparkMaster(), options.getAppName()).version(); if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) { assertTrue(e.getCause() instanceof UserException); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java index 0f6db1f..3d85f46 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java @@ -93,7 +93,7 @@ public class SimpleWordCountTest { } } - private static class CountWords extends PTransform, PCollection> { + public static class CountWords extends PTransform, PCollection> { @Override public PCollection apply(PCollection lines) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/WindowedWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/WindowedWordCountTest.java new file mode 100644 index 0000000..c16878e --- /dev/null +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/WindowedWordCountTest.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package com.cloudera.dataflow.spark; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.ImmutableList; +import java.util.Arrays; +import java.util.List; +import org.joda.time.Duration; +import org.junit.Test; + +public class WindowedWordCountTest { + private static final String[] WORDS_ARRAY = { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + private static final Long[] TIMESTAMPS_ARRAY = { + 60000L, 60000L, 60000L, + 120000L, 120000L, 120000L}; + private static final List WORDS = Arrays.asList(WORDS_ARRAY); + private static final List TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY); + private static final List EXPECTED_COUNT_SET = + ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", + "hi: 2", "sue: 1", "bob: 1"); + + @Test + public void testRun() throws Exception { + SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + options.setRunner(SparkPipelineRunner.class); + Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + PCollection inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)) + .setCoder(StringUtf8Coder.of()); + PCollection windowedWords = inputWords + .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))); + + PCollection output = windowedWords.apply(new SimpleWordCountTest.CountWords()); + + DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + + EvaluationResult res = SparkPipelineRunner.create().run(p); + res.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java new file mode 100644 index 0000000..8778e00 --- /dev/null +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package com.cloudera.dataflow.spark.streaming; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableMap; +import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableSet; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import com.cloudera.dataflow.io.KafkaIO; +import com.cloudera.dataflow.spark.EvaluationResult; +import com.cloudera.dataflow.spark.SparkPipelineRunner; +import com.cloudera.dataflow.spark.streaming.utils.DataflowAssertStreaming; +import com.cloudera.dataflow.spark.streaming.utils.EmbeddedKafkaCluster; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.joda.time.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import kafka.serializer.StringDecoder; + +/** + * Test Kafka as input. + */ +public class KafkaStreamingTest { + private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER = + new EmbeddedKafkaCluster.EmbeddedZookeeper(17001); + private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER = + new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), + new Properties(), Collections.singletonList(6667)); + private static final String TOPIC = "kafka_dataflow_test_topic"; + private static final Map KAFKA_MESSAGES = ImmutableMap.of( + "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4" + ); + private static final Set EXPECTED = ImmutableSet.of( + "k1,v1", "k2,v2", "k3,v3", "k4,v4" + ); + private final static long TEST_TIMEOUT_MSEC = 1000L; + + @BeforeClass + public static void init() throws IOException, InterruptedException { + EMBEDDED_ZOOKEEPER.startup(); + EMBEDDED_KAFKA_CLUSTER.startup(); + + // write to Kafka + Properties producerProps = new Properties(); + producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps()); + producerProps.put("request.required.acks", 1); + producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList()); + Serializer stringSerializer = new StringSerializer(); + @SuppressWarnings("unchecked") KafkaProducer kafkaProducer = + new KafkaProducer(producerProps, stringSerializer, stringSerializer); + for (Map.Entry en : KAFKA_MESSAGES.entrySet()) { + kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue())); + } + kafkaProducer.close(); + } + + @Test + public void testRun() throws Exception { + // test read from Kafka + SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); + options.setAppName(this.getClass().getSimpleName()); + options.setRunner(SparkPipelineRunner.class); + options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval + Pipeline p = Pipeline.create(options); + + Map kafkaParams = ImmutableMap.of( + "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(), + "auto.offset.reset", "smallest" + ); + + PCollection> kafkaInput = p.apply(KafkaIO.Read.from(StringDecoder.class, + StringDecoder.class, String.class, String.class, Collections.singleton(TOPIC), + kafkaParams)); + PCollection> windowedWords = kafkaInput + .apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))); + + PCollection formattedKV = windowedWords.apply(ParDo.of(new FormatKVFn())); + + DataflowAssert.thatIterable(formattedKV.apply(View.asIterable())) + .containsInAnyOrder(EXPECTED); + + EvaluationResult res = SparkPipelineRunner.create(options).run(p); + res.close(); + + DataflowAssertStreaming.assertNoFailures(res); + } + + @AfterClass + public static void tearDown() { + EMBEDDED_KAFKA_CLUSTER.shutdown(); + EMBEDDED_ZOOKEEPER.shutdown(); + } + + private static class FormatKVFn extends DoFn, String> { + @Override + public void processElement(ProcessContext c) { + c.output(c.element().getKey() + "," + c.element().getValue()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java new file mode 100644 index 0000000..613e517 --- /dev/null +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package com.cloudera.dataflow.spark.streaming; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.ImmutableSet; + +import com.cloudera.dataflow.io.ConsoleIO; +import com.cloudera.dataflow.io.CreateStream; +import com.cloudera.dataflow.spark.EvaluationResult; +import com.cloudera.dataflow.spark.SimpleWordCountTest; +import com.cloudera.dataflow.spark.SparkPipelineRunner; +import com.cloudera.dataflow.spark.streaming.utils.DataflowAssertStreaming; + +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class SimpleStreamingWordCountTest { + + private static final String[] WORDS_ARRAY = { + "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; + private static final List> WORDS_QUEUE = + Collections.>singletonList(Arrays.asList(WORDS_ARRAY)); + private static final Set EXPECTED_COUNT_SET = + ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + final static long TEST_TIMEOUT_MSEC = 1000L; + + @Test + public void testRun() throws Exception { + SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); + options.setAppName(this.getClass().getSimpleName()); + options.setRunner(SparkPipelineRunner.class); + options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval + Pipeline p = Pipeline.create(options); + + PCollection inputWords = + p.apply(CreateStream.fromQueue(WORDS_QUEUE)).setCoder(StringUtf8Coder.of()); + PCollection windowedWords = inputWords + .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))); + + PCollection output = windowedWords.apply(new SimpleWordCountTest.CountWords()); + + DataflowAssert.thatIterable(output.apply(View.asIterable())) + .containsInAnyOrder(EXPECTED_COUNT_SET); + + EvaluationResult res = SparkPipelineRunner.create(options).run(p); + res.close(); + + DataflowAssertStreaming.assertNoFailures(res); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/DataflowAssertStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/DataflowAssertStreaming.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/DataflowAssertStreaming.java new file mode 100644 index 0000000..c0c5976 --- /dev/null +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/DataflowAssertStreaming.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package com.cloudera.dataflow.spark.streaming.utils; + +import com.cloudera.dataflow.spark.EvaluationResult; + +import org.junit.Assert; + +/** + * Since DataflowAssert doesn't propagate assert exceptions, use Aggregators to assert streaming + * success/failure counters. + */ +public final class DataflowAssertStreaming { + /** + * Copied aggregator names from {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} + */ + static final String SUCCESS_COUNTER = "DataflowAssertSuccess"; + static final String FAILURE_COUNTER = "DataflowAssertFailure"; + + private DataflowAssertStreaming() { + } + + public static void assertNoFailures(EvaluationResult res) { + int failures = res.getAggregatorValue(FAILURE_COUNTER, Integer.class); + Assert.assertEquals("Found " + failures + " failures, see the log for details", 0, failures); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java new file mode 100644 index 0000000..6daae54 --- /dev/null +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java @@ -0,0 +1,315 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package com.cloudera.dataflow.spark.streaming.utils; + +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.Time; + +/** + * https://gist.github.com/fjavieralba/7930018 + */ +public class EmbeddedKafkaCluster { + private final List ports; + private final String zkConnection; + private final Properties baseProperties; + + private final String brokerList; + + private final List brokers; + private final List logDirs; + + public EmbeddedKafkaCluster(String zkConnection) { + this(zkConnection, new Properties()); + } + + public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties) { + this(zkConnection, baseProperties, Collections.singletonList(-1)); + } + + public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties, List ports) { + this.zkConnection = zkConnection; + this.ports = resolvePorts(ports); + this.baseProperties = baseProperties; + + this.brokers = new ArrayList(); + this.logDirs = new ArrayList(); + + this.brokerList = constructBrokerList(this.ports); + } + + private List resolvePorts(List ports) { + List resolvedPorts = new ArrayList(); + for (Integer port : ports) { + resolvedPorts.add(resolvePort(port)); + } + return resolvedPorts; + } + + private int resolvePort(int port) { + if (port == -1) { + return TestUtils.getAvailablePort(); + } + return port; + } + + private String constructBrokerList(List ports) { + StringBuilder sb = new StringBuilder(); + for (Integer port : ports) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append("localhost:").append(port); + } + return sb.toString(); + } + + public void startup() { + for (int i = 0; i < ports.size(); i++) { + Integer port = ports.get(i); + File logDir = TestUtils.constructTempDir("kafka-local"); + + Properties properties = new Properties(); + properties.putAll(baseProperties); + properties.setProperty("zookeeper.connect", zkConnection); + properties.setProperty("broker.id", String.valueOf(i + 1)); + properties.setProperty("host.name", "localhost"); + properties.setProperty("port", Integer.toString(port)); + properties.setProperty("log.dir", logDir.getAbsolutePath()); + properties.setProperty("log.flush.interval.messages", String.valueOf(1)); + + KafkaServer broker = startBroker(properties); + + brokers.add(broker); + logDirs.add(logDir); + } + } + + + private KafkaServer startBroker(Properties props) { + KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime()); + server.startup(); + return server; + } + + public Properties getProps() { + Properties props = new Properties(); + props.putAll(baseProperties); + props.put("metadata.broker.list", brokerList); + props.put("zookeeper.connect", zkConnection); + return props; + } + + public String getBrokerList() { + return brokerList; + } + + public List getPorts() { + return ports; + } + + public String getZkConnection() { + return zkConnection; + } + + public void shutdown() { + for (KafkaServer broker : brokers) { + try { + broker.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + for (File logDir : logDirs) { + try { + TestUtils.deleteFile(logDir); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("EmbeddedKafkaCluster{"); + sb.append("brokerList='").append(brokerList).append('\''); + sb.append('}'); + return sb.toString(); + } + + public static class EmbeddedZookeeper { + private int port = -1; + private int tickTime = 500; + + private ServerCnxnFactory factory; + private File snapshotDir; + private File logDir; + + public EmbeddedZookeeper() { + this(-1); + } + + public EmbeddedZookeeper(int port) { + this(port, 500); + } + + public EmbeddedZookeeper(int port, int tickTime) { + this.port = resolvePort(port); + this.tickTime = tickTime; + } + + private int resolvePort(int port) { + if (port == -1) { + return TestUtils.getAvailablePort(); + } + return port; + } + + public void startup() throws IOException { + if (this.port == -1) { + this.port = TestUtils.getAvailablePort(); + } + this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port), + 1024); + this.snapshotDir = TestUtils.constructTempDir("embeeded-zk/snapshot"); + this.logDir = TestUtils.constructTempDir("embeeded-zk/log"); + + try { + factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + + public void shutdown() { + factory.shutdown(); + try { + TestUtils.deleteFile(snapshotDir); + } catch (FileNotFoundException e) { + // ignore + } + try { + TestUtils.deleteFile(logDir); + } catch (FileNotFoundException e) { + // ignore + } + } + + public String getConnection() { + return "localhost:" + port; + } + + public void setPort(int port) { + this.port = port; + } + + public void setTickTime(int tickTime) { + this.tickTime = tickTime; + } + + public int getPort() { + return port; + } + + public int getTickTime() { + return tickTime; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("EmbeddedZookeeper{"); + sb.append("connection=").append(getConnection()); + sb.append('}'); + return sb.toString(); + } + } + + static class SystemTime implements Time { + public long milliseconds() { + return System.currentTimeMillis(); + } + + public long nanoseconds() { + return System.nanoTime(); + } + + public void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // Ignore + } + } + } + + static class TestUtils { + private static final Random RANDOM = new Random(); + + private TestUtils() { + } + + public static File constructTempDir(String dirPrefix) { + File file = new File(System.getProperty("java.io.tmpdir"), dirPrefix + RANDOM.nextInt + (10000000)); + if (!file.mkdirs()) { + throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath()); + } + file.deleteOnExit(); + return file; + } + + public static int getAvailablePort() { + try { + ServerSocket socket = new ServerSocket(0); + try { + return socket.getLocalPort(); + } finally { + socket.close(); + } + } catch (IOException e) { + throw new IllegalStateException("Cannot find available port: " + e.getMessage(), e); + } + } + + public static boolean deleteFile(File path) throws FileNotFoundException { + if (!path.exists()) { + throw new FileNotFoundException(path.getAbsolutePath()); + } + boolean ret = true; + if (path.isDirectory()) { + for (File f : path.listFiles()) { + ret = ret && deleteFile(f); + } + } + return ret && path.delete(); + } + } +}