Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1F3F6200CAE for ; Wed, 7 Jun 2017 06:10:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1CE2E160BD3; Wed, 7 Jun 2017 04:10:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1B102160BC6 for ; Wed, 7 Jun 2017 06:09:58 +0200 (CEST) Received: (qmail 31937 invoked by uid 500); 7 Jun 2017 04:09:58 -0000 Mailing-List: contact commits-help@bahir.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@bahir.apache.org Delivered-To: mailing list commits@bahir.apache.org Received: (qmail 31928 invoked by uid 99); 7 Jun 2017 04:09:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jun 2017 04:09:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1D28BDFAF1; Wed, 7 Jun 2017 04:09:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lresende@apache.org To: commits@bahir.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: bahir git commit: [BAHIR-116] Add spark streaming connector to Google Cloud Pub/Sub Date: Wed, 7 Jun 2017 04:09:58 +0000 (UTC) archived-at: Wed, 07 Jun 2017 04:10:01 -0000 Repository: bahir Updated Branches: refs/heads/master 2a4307657 -> 56613263c [BAHIR-116] Add spark streaming connector to Google Cloud Pub/Sub Cloases #42. Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/56613263 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/56613263 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/56613263 Branch: refs/heads/master Commit: 56613263ca405aa5b45f32565ad4641f0a7b9752 Parents: 2a43076 Author: Chen Bin Authored: Thu Apr 27 17:18:32 2017 +0800 Committer: Luciano Resende Committed: Tue Jun 6 21:03:49 2017 -0700 ---------------------------------------------------------------------- pom.xml | 1 + streaming-pubsub/README.md | 45 +++ .../PubsubWordCount.scala | 159 +++++++++++ streaming-pubsub/pom.xml | 86 ++++++ .../streaming/pubsub/PubsubInputDStream.scala | 286 +++++++++++++++++++ .../spark/streaming/pubsub/PubsubUtils.scala | 105 +++++++ .../streaming/pubsub/SparkGCPCredentials.scala | 166 +++++++++++ .../streaming/LocalJavaStreamingContext.java | 44 +++ .../streaming/pubsub/JavaPubsubStreamSuite.java | 38 +++ .../src/test/resources/log4j.properties | 28 ++ .../spark/streaming/pubsub/PubsubFunSuite.scala | 46 +++ .../streaming/pubsub/PubsubStreamSuite.scala | 138 +++++++++ .../streaming/pubsub/PubsubTestUtils.scala | 142 +++++++++ .../SparkGCPCredentialsBuilderSuite.scala | 95 ++++++ 14 files changed, 1379 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f76aac5..81f2e28 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,7 @@ sql-streaming-mqtt streaming-twitter streaming-zeromq + streaming-pubsub http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/README.md ---------------------------------------------------------------------- diff --git a/streaming-pubsub/README.md b/streaming-pubsub/README.md new file mode 100644 index 0000000..faf9826 --- /dev/null +++ b/streaming-pubsub/README.md @@ -0,0 +1,45 @@ +A library for reading data from [Google Cloud Pub/Sub](https://cloud.google.com/pubsub/) using Spark Streaming. + +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % "2.2.0-SNAPSHOT" + +Using Maven: + + + org.apache.bahir + spark-streaming-pubsub_2.11 + 2.2.0-SNAPSHOT + + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + + $ bin/spark-shell --packages org.apache.bahir:spark-streaming-pubsub_2.11:2.2.0-SNAPSHOT + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +## Examples + +First you need to create credential by SparkGCPCredentials, it support four type of credentials +* application default + `SparkGCPCredentials.builder.build()` +* json type service account + `SparkGCPCredentials.builder.jsonServiceAccount(PATH_TO_JSON_KEY).build()` +* p12 type service account + `SparkGCPCredentials.builder.p12ServiceAccount(PATH_TO_P12_KEY, EMAIL_ACCOUNT).build()` +* metadata service account(running on dataproc) + `SparkGCPCredentials.builder.metadataServiceAccount().build()` + +### Scala API + + val lines = PubsubUtils.createStream(ssc, projectId, subscriptionName, credential, ..) + +### Java API + + JavaDStream lines = PubsubUtils.createStream(jssc, projectId, subscriptionName, credential...) + +See end-to-end examples at [Google Cloud Pubsub Examples](streaming-pubsub/examples) http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala ---------------------------------------------------------------------- diff --git a/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala b/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala new file mode 100644 index 0000000..00f1fa1 --- /dev/null +++ b/streaming-pubsub/examples/src/main/scala/org.apache.spark.examples.streaming.pubsub/PubsubWordCount.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.streaming.pubsub + +import scala.collection.JavaConverters._ +import scala.util.Random + +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport +import com.google.api.client.json.jackson2.JacksonFactory +import com.google.api.services.pubsub.Pubsub.Builder +import com.google.api.services.pubsub.model.PublishRequest +import com.google.api.services.pubsub.model.PubsubMessage +import com.google.cloud.hadoop.util.RetryHttpInitializer + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.pubsub.ConnectionUtils +import org.apache.spark.streaming.pubsub.PubsubTestUtils +import org.apache.spark.streaming.pubsub.PubsubUtils +import org.apache.spark.streaming.pubsub.SparkGCPCredentials +import org.apache.spark.streaming.pubsub.SparkPubsubMessage +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.SparkConf + + +/** + * Consumes messages from a Google Cloud Pub/Sub subscription and does wordcount. + * In this example it use application default credentials, so need to use gcloud + * client to generate token file before running example + * + * Usage: PubsubWordCount + * is the name of Google cloud + * is the subscription to a topic + * + * Example: + * # use gcloud client generate token file + * $ gcloud init + * $ gcloud auth application-default login + * + * # run the example + * $ bin/run-example \ + * org.apache.spark.examples.streaming.pubsub.PubsubWordCount project_1 subscription_1 + * + */ +object PubsubWordCount { + def main(args: Array[String]): Unit = { + if (args.length != 2) { + System.err.println( + """ + |Usage: PubsubWordCount + | + | is the name of Google cloud + | is the subscription to a topic + | + """.stripMargin) + System.exit(1) + } + + val Seq(projectId, subscription) = args.toSeq + + val sparkConf = new SparkConf().setAppName("PubsubWordCount") + val ssc = new StreamingContext(sparkConf, Milliseconds(2000)) + + val pubsubStream: ReceiverInputDStream[SparkPubsubMessage] = PubsubUtils.createStream( + ssc, projectId, None, subscription, + SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2) + + val wordCounts = + pubsubStream.map(message => (new String(message.getData()), 1)).reduceByKey(_ + _) + + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } + +} + +/** + * A Pub/Sub publisher for demonstration purposes, publishes message in 10 batches(seconds), + * you can set the size of messages in each batch by , + * and each message will contains only one word in this list + * ("google", "cloud", "pubsub", "say", "hello") + * + * Usage: PubsubPublisher + * + * is the name of Google cloud + * is the topic of Google cloud Pub/Sub + * is the rate of records per second to put onto the stream + * + * Example: + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.pubsub.PubsubPublisher project_1 topic_1 10` + */ +object PubsubPublisher { + def main(args: Array[String]): Unit = { + if (args.length != 3) { + System.err.println( + """ + |Usage: PubsubPublisher + | + | is the name of Google cloud + | is the topic of Google cloud Pub/Sub + | is the rate of records per second to put onto the topic + | + """.stripMargin) + System.exit(1) + } + + val Seq(projectId, topic, recordsPerSecond) = args.toSeq + + val APP_NAME = this.getClass.getSimpleName + + val client = new Builder( + GoogleNetHttpTransport.newTrustedTransport(), + JacksonFactory.getDefaultInstance(), + new RetryHttpInitializer( + SparkGCPCredentials.builder.build().provider, + APP_NAME + )) + .setApplicationName(APP_NAME) + .build() + + val randomWords = List("google", "cloud", "pubsub", "say", "hello") + val publishRequest = new PublishRequest() + for (i <- 1 to 10) { + val messages = (1 to recordsPerSecond.toInt).map { recordNum => + val randomWordIndex = Random.nextInt(randomWords.size) + new PubsubMessage().encodeData(randomWords(randomWordIndex).getBytes()) + } + publishRequest.setMessages(messages.asJava) + client.projects().topics() + .publish(s"projects/$projectId/topics/$topic", publishRequest) + .execute() + println(s"Published data. topic: $topic; Mesaage: $publishRequest") + + Thread.sleep(1000) + } + + } +} +// scalastyle:on http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-pubsub/pom.xml b/streaming-pubsub/pom.xml new file mode 100644 index 0000000..c3da90f --- /dev/null +++ b/streaming-pubsub/pom.xml @@ -0,0 +1,86 @@ + + + + + 4.0.0 + + bahir-parent_2.11 + org.apache.bahir + 2.2.0-SNAPSHOT + ../pom.xml + + + org.apache.bahir + spark-streaming-pubsub_2.11 + + streaming-pubsub + + jar + Apache Bahir - Spark Streaming Google PubSub + http://bahir.apache.org/ + + + + org.apache.spark + spark-tags_${scala.binary.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${spark.version} + provided + + + com.google.apis + google-api-services-pubsub + v1-rev355-1.22.0 + + + com.google.cloud.bigdataoss + util + 1.6.0 + + + com.google.cloud.bigdataoss + util-hadoop + 1.6.0-hadoop2 + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + test-jar + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-source-plugin + + + + http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala new file mode 100644 index 0000000..e769f2e --- /dev/null +++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubsub + +import java.io.{Externalizable, ObjectInput, ObjectOutput} + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import com.google.api.client.auth.oauth2.Credential +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport +import com.google.api.client.googleapis.json.GoogleJsonResponseException +import com.google.api.client.json.jackson2.JacksonFactory +import com.google.api.services.pubsub.Pubsub.Builder +import com.google.api.services.pubsub.model.{AcknowledgeRequest, PubsubMessage, PullRequest} +import com.google.api.services.pubsub.model.Subscription +import com.google.cloud.hadoop.util.RetryHttpInitializer + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.util.Utils + +/** + * Input stream that subscribe messages from Google cloud Pub/Sub subscription. + * @param project Google cloud project id + * @param topic Topic name for creating subscription if need + * @param subscription Pub/Sub subscription name + * @param credential Google cloud project credential to access Pub/Sub service + */ +private[streaming] +class PubsubInputDStream( + _ssc: StreamingContext, + val project: String, + val topic: Option[String], + val subscription: String, + val credential: SparkGCPCredentials, + val _storageLevel: StorageLevel +) extends ReceiverInputDStream[SparkPubsubMessage](_ssc) { + + override def getReceiver(): Receiver[SparkPubsubMessage] = { + new PubsubReceiver(project, topic, subscription, credential, _storageLevel) + } +} + +/** + * A wrapper class for PubsubMessage's with a custom serialization format. + * + * This is necessary because PubsubMessage uses inner data structures + * which are not serializable. + */ +class SparkPubsubMessage() extends Externalizable { + + private[pubsub] var message = new PubsubMessage + + def getData(): Array[Byte] = message.decodeData() + + def getAttributes(): java.util.Map[String, String] = message.getAttributes + + def getMessageId(): String = message.getMessageId + + def getPublishTime(): String = message.getPublishTime + + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { + message.decodeData() match { + case null => out.writeInt(-1) + case data => + out.writeInt(data.size) + out.write(data) + } + + message.getMessageId match { + case null => out.writeInt(-1) + case id => + val idBuff = Utils.serialize(id) + out.writeInt(idBuff.length) + out.write(idBuff) + } + + message.getPublishTime match { + case null => out.writeInt(-1) + case time => + val publishTimeBuff = Utils.serialize(time) + out.writeInt(publishTimeBuff.length) + out.write(publishTimeBuff) + } + + message.getAttributes match { + case null => out.writeInt(-1) + case attrs => + out.writeInt(attrs.size()) + for ((k, v) <- message.getAttributes.asScala) { + val keyBuff = Utils.serialize(k) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } + } + + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { + in.readInt() match { + case -1 => message.encodeData(null) + case bodyLength => + val data = new Array[Byte](bodyLength) + in.readFully(data) + message.encodeData(data) + } + + in.readInt() match { + case -1 => message.setMessageId(null) + case idLength => + val idBuff = new Array[Byte](idLength) + in.readFully(idBuff) + val id: String = Utils.deserialize(idBuff) + message.setMessageId(id) + } + + in.readInt() match { + case -1 => message.setPublishTime(null) + case publishTimeLength => + val publishTimeBuff = new Array[Byte](publishTimeLength) + in.readFully(publishTimeBuff) + val publishTime: String = Utils.deserialize(publishTimeBuff) + message.setPublishTime(publishTime) + } + + in.readInt() match { + case -1 => message.setAttributes(null) + case numAttributes => + val attributes = new java.util.HashMap[String, String] + for (i <- 0 until numAttributes) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.readFully(keyBuff) + val key: String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.readFully(valBuff) + val value: String = Utils.deserialize(valBuff) + + attributes.put(key, value) + } + message.setAttributes(attributes) + } + } +} + +private [pubsub] +object ConnectionUtils { + val transport = GoogleNetHttpTransport.newTrustedTransport(); + val jacksonFactory = JacksonFactory.getDefaultInstance; + + // The topic or subscription already exists. + // This is an error on creation operations. + val ALREADY_EXISTS = 409 + + /** + * Client can retry with these response status + */ + val RESOURCE_EXHAUSTED = 429 + + val CANCELLED = 499 + + val INTERNAL = 500 + + val UNAVAILABLE = 503 + + val DEADLINE_EXCEEDED = 504 + + def retryable(status: Int): Boolean = { + status match { + case RESOURCE_EXHAUSTED | CANCELLED | INTERNAL | UNAVAILABLE | DEADLINE_EXCEEDED => true + case _ => false + } + } +} + + +private[pubsub] +class PubsubReceiver( + project: String, + topic: Option[String], + subscription: String, + credential: SparkGCPCredentials, + storageLevel: StorageLevel) + extends Receiver[SparkPubsubMessage](storageLevel) { + + val APP_NAME = "sparkstreaming-pubsub-receiver" + + val INIT_BACKOFF = 100 // 100ms + + val MAX_BACKOFF = 10 * 1000 // 10s + + val MAX_MESSAGE = 1000 + + lazy val client = new Builder( + ConnectionUtils.transport, + ConnectionUtils.jacksonFactory, + new RetryHttpInitializer(credential.provider, APP_NAME)) + .setApplicationName(APP_NAME) + .build() + + val projectFullName: String = s"projects/$project" + val subscriptionFullName: String = s"$projectFullName/subscriptions/$subscription" + + override def onStart(): Unit = { + topic match { + case Some(t) => + val sub: Subscription = new Subscription + sub.setTopic(s"$projectFullName/topics/$t") + try { + client.projects().subscriptions().create(subscriptionFullName, sub).execute() + } catch { + case e: GoogleJsonResponseException => + if (e.getDetails.getCode == ConnectionUtils.ALREADY_EXISTS) { + // Ignore subscription already exists exception. + } else { + reportError("Failed to create subscription", e) + } + case NonFatal(e) => + reportError("Failed to create subscription", e) + } + case None => // do nothing + } + new Thread() { + override def run() { + receive() + } + }.start() + } + + def receive(): Unit = { + val pullRequest = new PullRequest().setMaxMessages(MAX_MESSAGE).setReturnImmediately(false) + var backoff = INIT_BACKOFF + while (!isStopped()) { + try { + val pullResponse = + client.projects().subscriptions().pull(subscriptionFullName, pullRequest).execute() + val receivedMessages = pullResponse.getReceivedMessages.asScala.toList + store(receivedMessages + .map(x => { + val sm = new SparkPubsubMessage + sm.message = x.getMessage + sm + }) + .iterator) + + val ackRequest = new AcknowledgeRequest() + ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava) + client.projects().subscriptions().acknowledge(subscriptionFullName, ackRequest).execute() + backoff = INIT_BACKOFF + } catch { + case e: GoogleJsonResponseException => + if (ConnectionUtils.retryable(e.getDetails.getCode)) { + Thread.sleep(backoff) + backoff = Math.min(backoff * 2, MAX_BACKOFF) + } else { + reportError("Failed to pull messages", e) + } + case NonFatal(e) => reportError("Failed to pull messages", e) + } + } + } + + override def onStop(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala new file mode 100644 index 0000000..b4f02b9 --- /dev/null +++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubUtils.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubsub + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +object PubsubUtils { + + /** + * Create an input stream that receives messages pushed by a Pub/Sub publisher + * using service account authentication + * + * If topic is given, and the subscription doesn't exist, + * create subscription by the given name. + * Note: This Receiver will only receive the message arrived after the subscription created. + * If topic is not given, throw not found exception when it doesn't exist + * + * @param ssc StreamingContext object + * @param project Google cloud project id + * @param topic Topic name for creating subscription if need + * @param subscription Subscription name to subscribe to + * @param credentials SparkGCPCredentials to use for authenticating + * @param storageLevel RDD storage level + * @return + */ + def createStream( + ssc: StreamingContext, + project: String, + topic: Option[String], + subscription: String, + credentials: SparkGCPCredentials, + storageLevel: StorageLevel): ReceiverInputDStream[SparkPubsubMessage] = { + ssc.withNamedScope("pubsub stream") { + + new PubsubInputDStream( + ssc, + project, + topic, + subscription, + credentials, + storageLevel) + } + } + + /** + * Create an input stream that receives messages pushed by a Pub/Sub publisher + * using given credential + * + * Throw not found exception if the subscription doesn't exist + * + * @param jssc JavaStreamingContext object + * @param project Google cloud project id + * @param subscription Subscription name to subscribe to + * @param credentials SparkGCPCredentials to use for authenticating + * @param storageLevel RDD storage level + * @return + */ + def createStream(jssc: JavaStreamingContext, project: String, subscription: String, + credentials: SparkGCPCredentials, storageLevel: StorageLevel + ): JavaReceiverInputDStream[SparkPubsubMessage] = { + createStream(jssc.ssc, project, None, subscription, credentials, storageLevel) + } + + /** + * Create an input stream that receives messages pushed by a Pub/Sub publisher + * using given credential + * + * If the subscription doesn't exist, create subscription by the given name. + * Note: This Receiver will only receive the message arrived after the subscription created. + * + * @param jssc JavaStreamingContext object + * @param project Google cloud project id + * @param topic Topic name for creating subscription if need + * @param subscription Subscription name to subscribe to + * @param credentials SparkGCPCredentials to use for authenticating + * @param storageLevel RDD storage level + * @return + */ + def createStream(jssc: JavaStreamingContext, + project: String, topic: String, subscription: String, + credentials: SparkGCPCredentials, storageLevel: StorageLevel + ): JavaReceiverInputDStream[SparkPubsubMessage] = { + createStream(jssc.ssc, project, Some(topic), subscription, credentials, storageLevel) + } +} + http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala new file mode 100644 index 0000000..5cadde3 --- /dev/null +++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubsub + +import com.google.api.client.auth.oauth2.Credential +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential +import com.google.api.services.pubsub.PubsubScopes +import com.google.cloud.hadoop.util.{EntriesCredentialConfiguration, HadoopCredentialConfiguration} +import java.util +import org.apache.hadoop.conf.Configuration + +/** + * Serializable interface providing a method executors can call to obtain an + * GCPCredentialsProvider instance for authenticating to GCP services. + */ +private[pubsub] sealed trait SparkGCPCredentials extends Serializable { + + def provider: Credential +} + +/** + * Returns application default type credential + */ +private[pubsub] final case object ApplicationDefaultCredentials extends SparkGCPCredentials { + + override def provider: Credential = { + GoogleCredential.getApplicationDefault.createScoped(PubsubScopes.all()) + } +} + +/** + * Returns a Service Account type Credential instance. + * If all parameters are None, then try metadata service type + * If jsonFilePath available, try json type + * If jsonFilePath is None and p12FilePath and emailAccount available, try p12 type + * + * @param jsonFilePath file path for json + * @param p12FilePath file path for p12 + * @param emailAccount email account for p12 + */ +private[pubsub] final case class ServiceAccountCredentials( + jsonFilePath: Option[String] = None, + p12FilePath: Option[String] = None, + emailAccount: Option[String] = None) + extends SparkGCPCredentials { + + override def provider: Credential = { + val conf = new Configuration(false) + conf.setBoolean( + EntriesCredentialConfiguration.BASE_KEY_PREFIX + + EntriesCredentialConfiguration.ENABLE_SERVICE_ACCOUNTS_SUFFIX, + true) + jsonFilePath match { + case Some(jsonFilePath) => + conf.set( + EntriesCredentialConfiguration.BASE_KEY_PREFIX + + EntriesCredentialConfiguration.JSON_KEYFILE_SUFFIX, + jsonFilePath + ) + case _ => // do nothing + } + p12FilePath match { + case Some(p12FilePath) => + conf.set( + EntriesCredentialConfiguration.BASE_KEY_PREFIX + + EntriesCredentialConfiguration.SERVICE_ACCOUNT_KEYFILE_SUFFIX, + p12FilePath + ) + case _ => // do nothing + } + emailAccount match { + case Some(emailAccount) => + conf.set( + EntriesCredentialConfiguration.BASE_KEY_PREFIX + + EntriesCredentialConfiguration.SERVICE_ACCOUNT_EMAIL_SUFFIX, + emailAccount + ) + case _ => // do nothing + } + + HadoopCredentialConfiguration + .newBuilder() + .withConfiguration(conf) + .build() + .getCredential(new util.ArrayList(PubsubScopes.all())) + } + +} + +object SparkGCPCredentials { + + /** + * Builder for SparkGCPCredentials instance. + */ + class Builder { + private var creds: Option[SparkGCPCredentials] = None + + /** + * Use a json type key file for service account credential + * + * @param jsonFilePath json type key file + * @return Reference to this SparkGCPCredentials.Builder + */ + def jsonServiceAccount(jsonFilePath: String): Builder = { + creds = Option(ServiceAccountCredentials(Option(jsonFilePath))) + this + } + + /** + * Use a p12 type key file service account credential + * + * @param p12FilePath p12 type key file + * @param emailAccount email of service account + * @return Reference to this SparkGCPCredentials.Builder + */ + def p12ServiceAccount(p12FilePath: String, emailAccount: String): Builder = { + creds = Option(ServiceAccountCredentials( + p12FilePath = Option(p12FilePath), emailAccount = Option(emailAccount))) + this + } + + /** + * Use a meta data service to return service account + * @return Reference to this SparkGCPCredentials.Builder + */ + def metadataServiceAccount(): Builder = { + creds = Option(ServiceAccountCredentials()) + this + } + + /** + * Returns the appropriate instance of SparkGCPCredentials given the configured + * parameters. + * + * - The service account credentials will be returned if they were provided. + * + * - The application default credentials will be returned otherwise. + * @return + */ + def build(): SparkGCPCredentials = creds.getOrElse(ApplicationDefaultCredentials) + + } + + /** + * Creates a SparkGCPCredentials.Builder for constructing + * SparkGCPCredentials instance. + * + * @return SparkGCPCredentials.Builder instance + */ + def builder: Builder = new Builder +} http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000..cfedb5a --- /dev/null +++ b/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + SparkConf conf = new SparkConf() + .setMaster("local[2]") + .setAppName("test") + .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); + ssc = new JavaStreamingContext(conf, new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java b/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java new file mode 100644 index 0000000..360b9a9 --- /dev/null +++ b/streaming-pubsub/src/test/java/org/apache/spark/streaming/pubsub/JavaPubsubStreamSuite.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubsub; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.junit.Test; + +public class JavaPubsubStreamSuite extends LocalJavaStreamingContext { + @Test + public void testPubsubStream() { + // tests the API, does not actually test data receiving + JavaReceiverInputDStream stream1 = PubsubUtils.createStream( + ssc, "project", "subscription", + new SparkGCPCredentials.Builder().build(), StorageLevel.MEMORY_AND_DISK_SER_2()); + + JavaReceiverInputDStream stream2 = PubsubUtils.createStream( + ssc, "project", "topic", "subscription", + new SparkGCPCredentials.Builder().build(), StorageLevel.MEMORY_AND_DISK_SER_2()); + + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/test/resources/log4j.properties b/streaming-pubsub/src/test/resources/log4j.properties new file mode 100644 index 0000000..75e3b53 --- /dev/null +++ b/streaming-pubsub/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark-project.jetty=WARN + http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala new file mode 100644 index 0000000..acdceb7 --- /dev/null +++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubsub + +import org.apache.spark.SparkFunSuite + +/** + * Helper class that runs Google Cloud Pub/Sub real data transfer tests of + * ignores them based on env variable is set or not. + */ +trait PubsubFunSuite extends SparkFunSuite { + import PubsubTestUtils._ + + /** Run the test if environment variable is set or ignore the test */ + def testIfEnabled(testName: String)(testBody: => Unit) { + if (shouldRunTests) { + test(testName)(testBody) + } else { + ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody) + } + } + + /** Run the give body of code only if Kinesis tests are enabled */ + def runIfTestsEnabled(message: String)(body: => Unit): Unit = { + if (shouldRunTests) { + body + } else { + ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(()) + } + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala new file mode 100644 index 0000000..284950c --- /dev/null +++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubsub + +import java.util.UUID + +import scala.concurrent.duration._ + +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.SparkFunSuite +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Seconds + +class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAfter { + + val batchDuration = Seconds(1) + + private val master: String = "local[2]" + + private val appName: String = this.getClass.getSimpleName + + private val topicName: String = s"bahirStreamTestTopic_${UUID.randomUUID()}" + + private val subscriptionName: String = s"${topicName}_sub" + + private val subForCreateName: String = s"${topicName}_create_me" + + private var ssc: StreamingContext = null + private var pubsubTestUtils: PubsubTestUtils = null + private var topicFullName: String = null + private var subscriptionFullName: String = null + private var subForCreateFullName: String = null + + override def beforeAll(): Unit = { + runIfTestsEnabled("Prepare PubsubTestUtils") { + pubsubTestUtils = new PubsubTestUtils + topicFullName = pubsubTestUtils.getFullTopicPath(topicName) + subscriptionFullName = pubsubTestUtils.getFullSubscriptionPath(subscriptionName) + subForCreateFullName = pubsubTestUtils.getFullSubscriptionPath(subForCreateName) + pubsubTestUtils.createTopic(topicFullName) + pubsubTestUtils.createSubscription(topicFullName, subscriptionFullName) + } + } + + override def afterAll(): Unit = { + if (pubsubTestUtils != null) { + pubsubTestUtils.removeSubscription(subForCreateFullName) + pubsubTestUtils.removeSubscription(subscriptionFullName) + pubsubTestUtils.removeTopic(topicFullName) + } + } + + before { + ssc = new StreamingContext(master, appName, batchDuration) + } + + after { + if (ssc != null) { + ssc.stop() + } + } + + test("PubsubUtils API") { + val pubsubStream1 = PubsubUtils.createStream( + ssc, "project", None, "subscription", + PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2) + + val pubsubStream2 = PubsubUtils.createStream( + ssc, "project", Some("topic"), "subscription", + PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2) + } + + testIfEnabled("pubsub input stream") { + val receiveStream = PubsubUtils.createStream( + ssc, PubsubTestUtils.projectId, Some(topicName), subscriptionName, + PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2) + + @volatile var receiveMessages: List[SparkPubsubMessage] = List() + receiveStream.foreachRDD { rdd => + if (rdd.collect().length > 0) { + receiveMessages = receiveMessages ::: List(rdd.first) + receiveMessages + } + } + + ssc.start() + + eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + val sendMessages = pubsubTestUtils.generatorMessages(10) + pubsubTestUtils.publishData(topicFullName, sendMessages) + assert(sendMessages.map(m => new String(m.getData)) + .contains(new String(receiveMessages(0).getData))) + assert(sendMessages.map(_.getAttributes).contains(receiveMessages(0).getAttributes)) + } + } + + testIfEnabled("pubsub input stream, create pubsub") { + val receiveStream = PubsubUtils.createStream( + ssc, PubsubTestUtils.projectId, Some(topicName), subForCreateName, + PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2) + + @volatile var receiveMessages: List[SparkPubsubMessage] = List() + receiveStream.foreachRDD { rdd => + if (rdd.collect().length > 0) { + receiveMessages = receiveMessages ::: List(rdd.first) + receiveMessages + } + } + + ssc.start() + + eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + val sendMessages = pubsubTestUtils.generatorMessages(10) + pubsubTestUtils.publishData(topicFullName, sendMessages) + assert(sendMessages.map(m => new String(m.getData)) + .contains(new String(receiveMessages(0).getData))) + assert(sendMessages.map(_.getAttributes).contains(receiveMessages(0).getAttributes)) + } + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala new file mode 100644 index 0000000..9dd719a --- /dev/null +++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubsub + +import scala.collection.JavaConverters._ + +import com.google.api.services.pubsub.Pubsub +import com.google.api.services.pubsub.Pubsub.Builder +import com.google.api.services.pubsub.model.PublishRequest +import com.google.api.services.pubsub.model.PubsubMessage +import com.google.api.services.pubsub.model.Subscription +import com.google.api.services.pubsub.model.Topic +import com.google.cloud.hadoop.util.RetryHttpInitializer + +import org.apache.spark.internal.Logging + +private[pubsub] class PubsubTestUtils extends Logging { + + val APP_NAME = this.getClass.getSimpleName + + val client: Pubsub = { + new Builder( + ConnectionUtils.transport, + ConnectionUtils.jacksonFactory, + new RetryHttpInitializer( + PubsubTestUtils.credential.provider, + APP_NAME + )) + .setApplicationName(APP_NAME) + .build() + } + + def createTopic(topic: String): Unit = { + val topicRequest = new Topic() + client.projects().topics().create(topic, topicRequest.setName(topic)).execute() + } + + def createSubscription(topic: String, subscription: String): Unit = { + val subscriptionRequest = new Subscription() + client.projects().subscriptions().create(subscription, + subscriptionRequest.setTopic(topic).setName(subscription)).execute() + } + + def publishData(topic: String, messages: List[SparkPubsubMessage]): Unit = { + val publishRequest = new PublishRequest() + publishRequest.setMessages(messages.map(m => m.message).asJava) + client.projects().topics().publish(topic, publishRequest).execute() + } + + def removeSubscription(subscription: String): Unit = { + client.projects().subscriptions().delete(subscription).execute() + } + + def removeTopic(topic: String): Unit = { + client.projects().topics().delete(topic).execute() + } + + def generatorMessages(num: Int): List[SparkPubsubMessage] = { + (1 to num) + .map(n => { + val m = new PubsubMessage() + m.encodeData(s"data$n".getBytes) + m.setAttributes(Map("a1" -> s"v1$n", "a2" -> s"v2$n").asJava) + }) + .map(m => { + val sm = new SparkPubsubMessage() + sm.message = m + sm + }) + .toList + } + + def getFullTopicPath(topic: String): String = + s"projects/${PubsubTestUtils.projectId}/topics/$topic" + + def getFullSubscriptionPath(subscription: String): String = + s"projects/${PubsubTestUtils.projectId}/subscriptions/$subscription" + +} + +private[pubsub] object PubsubTestUtils { + + val envVarNameForEnablingTests = "ENABLE_PUBSUB_TESTS" + val envVarNameForGoogleCloudProjectId = "GCP_TEST_PROJECT_ID" + val envVarNameForJsonKeyPath = "GCP_TEST_JSON_KEY_PATH" + val envVarNameForP12KeyPath = "GCP_TEST_P12_KEY_PATH" + val envVarNameForAccount = "GCP_TEST_ACCOUNT" + + lazy val shouldRunTests = { + val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1") + if (isEnvSet) { + // scalastyle:off println + // Print this so that they are easily visible on the console and not hidden in the log4j logs. + println( + s""" + |Google Pub/Sub tests that actually send data has been enabled by setting the environment + |variable $envVarNameForEnablingTests to 1. + |This will create Pub/Sub Topics and Subscriptions in Google cloud platform. + |Please be aware that this may incur some Google cloud costs. + |Set the environment variable $envVarNameForGoogleCloudProjectId to the desired project. + """.stripMargin) + // scalastyle:on println + } + isEnvSet + } + + lazy val projectId = { + val id = sys.env.getOrElse(envVarNameForGoogleCloudProjectId, + throw new IllegalArgumentException( + s"Need to set environment varibable $envVarNameForGoogleCloudProjectId if enable test.")) + // scalastyle:off println + // Print this so that they are easily visible on the console and not hidden in the log4j logs. + println(s"Using project $id for creating Pub/Sub topic and subscription for tests.") + // scalastyle:on println + id + } + + lazy val credential = + sys.env.get(envVarNameForJsonKeyPath) + .map(path => SparkGCPCredentials.builder.jsonServiceAccount(path).build()) + .getOrElse( + sys.env.get(envVarNameForP12KeyPath) + .map(path => SparkGCPCredentials.builder.p12ServiceAccount( + path, sys.env.get(envVarNameForAccount).get + ).build()) + .getOrElse(SparkGCPCredentials.builder.build())) +} http://git-wip-us.apache.org/repos/asf/bahir/blob/56613263/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala new file mode 100644 index 0000000..e47b0b2 --- /dev/null +++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubsub + +import java.io.FileNotFoundException + +import org.scalatest.concurrent.Timeouts + +import org.apache.spark.util.Utils +import org.apache.spark.SparkFunSuite + +class SparkGCPCredentialsBuilderSuite extends SparkFunSuite with Timeouts { + private def builder = SparkGCPCredentials.builder + + private val jsonCreds = ServiceAccountCredentials( + jsonFilePath = Option("json-key-path") + ) + + private val p12Creds = ServiceAccountCredentials( + p12FilePath = Option("p12-key-path"), + emailAccount = Option("email") + ) + + private val metadataCreds = ServiceAccountCredentials() + + test("should build application default") { + assert(builder.build() === ApplicationDefaultCredentials) + } + + test("should build json service account") { + assertResult(jsonCreds) { + builder.jsonServiceAccount(jsonCreds.jsonFilePath.get).build() + } + } + + test("should provide json creds") { + val thrown = intercept[FileNotFoundException] { + jsonCreds.provider + } + assert(thrown.getMessage === "json-key-path (No such file or directory)") + } + + test("should build p12 service account") { + assertResult(p12Creds) { + builder.p12ServiceAccount(p12Creds.p12FilePath.get, p12Creds.emailAccount.get).build() + } + } + + test("should provide p12 creds") { + val thrown = intercept[FileNotFoundException] { + p12Creds.provider + } + assert(thrown.getMessage === "p12-key-path (No such file or directory)") + } + + test("should build metadata service account") { + assertResult(metadataCreds) { + builder.metadataServiceAccount().build() + } + } + + test("SparkGCPCredentials classes should be serializable") { + assertResult(jsonCreds) { + Utils.deserialize[ServiceAccountCredentials](Utils.serialize(jsonCreds)) + } + + assertResult(p12Creds) { + Utils.deserialize[ServiceAccountCredentials](Utils.serialize(p12Creds)) + } + + assertResult(metadataCreds) { + Utils.deserialize[ServiceAccountCredentials](Utils.serialize(metadataCreds)) + } + + assertResult(ApplicationDefaultCredentials) { + Utils.deserialize[ServiceAccountCredentials](Utils.serialize(ApplicationDefaultCredentials)) + } + } + +}