Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9CFBB200B73 for ; Mon, 29 Aug 2016 14:05:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9B68B160AB8; Mon, 29 Aug 2016 12:05:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 94B13160AA7 for ; Mon, 29 Aug 2016 14:05:27 +0200 (CEST) Received: (qmail 50478 invoked by uid 500); 29 Aug 2016 12:05:26 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 50469 invoked by uid 99); 29 Aug 2016 12:05:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Aug 2016 12:05:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 60E0B180481 for ; Mon, 29 Aug 2016 12:05:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id CpPDdsDgepL5 for ; Mon, 29 Aug 2016 12:05:22 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id E39245FAEF for ; Mon, 29 Aug 2016 12:05:21 +0000 (UTC) Received: (qmail 50314 invoked by uid 99); 29 Aug 2016 12:05:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Aug 2016 12:05:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5DCBBDFF4E; Mon, 29 Aug 2016 12:05:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbonofre@apache.org To: commits@beam.incubator.apache.org Date: Mon, 29 Aug 2016 12:05:21 -0000 Message-Id: <883b4db1f9194f76beed33af933c4e24@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: [BEAM-313] Provide a context for SparkRunner archived-at: Mon, 29 Aug 2016 12:05:28 -0000 Repository: incubator-beam Updated Branches: refs/heads/master 676843e04 -> 3666c22cb [BEAM-313] Provide a context for SparkRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/017da7ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/017da7ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/017da7ba Branch: refs/heads/master Commit: 017da7bac3e844ef7391aabbcbaf86c9c99af968 Parents: 676843e Author: Abbass MAROUNI Authored: Mon Aug 29 13:28:46 2016 +0200 Committer: Abbass MAROUNI Committed: Mon Aug 29 13:28:46 2016 +0200 ---------------------------------------------------------------------- .../runners/spark/SparkPipelineOptions.java | 13 ++ .../apache/beam/runners/spark/SparkRunner.java | 16 ++- .../runners/spark/ProvidedSparkContextTest.java | 138 +++++++++++++++++++ 3 files changed, 164 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/017da7ba/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index be4f7f0..db6b75c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -18,11 +18,14 @@ package org.apache.beam.runners.spark; +import com.fasterxml.jackson.annotation.JsonIgnore; + import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.spark.api.java.JavaSparkContext; /** * Spark runner pipeline options. @@ -49,4 +52,14 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, @Default.Boolean(true) Boolean getEnableSparkSinks(); void setEnableSparkSinks(Boolean enableSparkSinks); + + @Description("If the spark runner will be initialized with a provided Spark Context") + @Default.Boolean(false) + boolean getUsesProvidedSparkContext(); + void setUsesProvidedSparkContext(boolean value); + + @Description("Provided Java Spark Context") + @JsonIgnore + JavaSparkContext getProvidedSparkContext(); + void setProvidedSparkContext(JavaSparkContext jsc); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/017da7ba/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index fa85a2e..9f1a839 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -143,9 +143,19 @@ public final class SparkRunner extends PipelineRunner { public EvaluationResult run(Pipeline pipeline) { try { LOG.info("Executing pipeline using the SparkRunner."); - JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(), - mOptions.getAppName()); - + JavaSparkContext jsc; + if (mOptions.getUsesProvidedSparkContext()) { + LOG.info("Using a provided Spark Context"); + jsc = mOptions.getProvidedSparkContext(); + if (jsc == null || jsc.sc().isStopped()){ + LOG.error("The provided Spark context " + + jsc + " was not created or was stopped"); + throw new RuntimeException("The provided Spark context was not created or was stopped"); + } + } else { + LOG.info("Creating a new Spark Context"); + jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(), mOptions.getAppName()); + } if (mOptions.isStreaming()) { SparkPipelineTranslator translator = new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/017da7ba/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java new file mode 100644 index 0000000..cbc5976 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import static org.junit.Assert.fail; + +import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.Test; + +/** + * Provided Spark Context tests. + */ +public class ProvidedSparkContextTest { + private static final String[] WORDS_ARRAY = { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + private static final List WORDS = Arrays.asList(WORDS_ARRAY); + private static final Set EXPECTED_COUNT_SET = + ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + private static final String PROVIDED_CONTEXT_EXCEPTION = + "The provided Spark context was not created or was stopped"; + + /** + * Provide a context and call pipeline run. + * @throws Exception + */ + @Test + public void testWithProvidedContext() throws Exception { + JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); + + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + options.setUsesProvidedSparkContext(true); + options.setProvidedSparkContext(jsc); + + Pipeline p = Pipeline.create(options); + PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder + .of())); + PCollection output = inputWords.apply(new WordCount.CountWords()) + .apply(MapElements.via(new WordCount.FormatAsTextFn())); + + PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + + // Run test from pipeline + p.run(); + + jsc.stop(); + } + + /** + * Provide a context and call pipeline run. + * @throws Exception + */ + @Test + public void testWithNullContext() throws Exception { + JavaSparkContext jsc = null; + + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + options.setUsesProvidedSparkContext(true); + options.setProvidedSparkContext(jsc); + + Pipeline p = Pipeline.create(options); + PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder + .of())); + PCollection output = inputWords.apply(new WordCount.CountWords()) + .apply(MapElements.via(new WordCount.FormatAsTextFn())); + + PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + + try { + p.run(); + fail("Should throw an exception when The provided Spark context is null"); + } catch (RuntimeException e){ + assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION)); + } + } + + /** + * A SparkRunner with a stopped provided Spark context cannot run pipelines. + * @throws Exception + */ + @Test + public void testWithStoppedProvidedContext() throws Exception { + JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); + // Stop the provided Spark context directly + jsc.stop(); + + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + options.setUsesProvidedSparkContext(true); + options.setProvidedSparkContext(jsc); + + Pipeline p = Pipeline.create(options); + PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder + .of())); + PCollection output = inputWords.apply(new WordCount.CountWords()) + .apply(MapElements.via(new WordCount.FormatAsTextFn())); + + PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + + try { + p.run(); + fail("Should throw an exception when The provided Spark context is stopped"); + } catch (RuntimeException e){ + assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION)); + } + } + +}