beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-313) Enable the use of an existing spark context with the SparkPipelineRunner
Date Mon, 27 Aug 2018 11:40:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-313?focusedWorklogId=138351&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-138351
]

ASF GitHub Bot logged work on BEAM-313:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Aug/18 11:39
            Start Date: 27/Aug/18 11:39
    Worklog Time Spent: 10m 
      Work Description: amarouni commented on issue #401: [BEAM-313] Enable the use of an
existing spark context with the SparkPipelineRunner
URL: https://github.com/apache/beam/pull/401#issuecomment-416199029
 
 
   @kohlerm 
   
   Here's an old Scala snippet that shows how to use Beam & SJS, it's based on old versions
of SJS & Beam so it probably won't compile with new SJS/Beam versions but you'll get the
idea : 
   
   ```scala
   import com.typesafe.config.Config
   import org.apache.beam.runners.spark.{ SparkContextOptions, SparkRunner }
   import org.apache.beam.sdk.Pipeline
   import org.apache.beam.sdk.coders.StringUtf8Coder
   import org.apache.beam.sdk.options.PipelineOptionsFactory
   import org.apache.beam.sdk.transforms.Create
   import org.apache.spark.SparkContext
   import org.apache.spark.api.java.JavaSparkContext
   import spark.jobserver.{ SparkJob, SparkJobInvalid, SparkJobValid, SparkJobValidation }
   
   import scala.collection.JavaConversions
   import scala.util.Try
   
   /**
    * Beam wordcount test. Returns the word count of a fixed String seq.
    */
   object BeamWordCount extends SparkJob {
     override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
   
       Try(config.getStringList("wordList"))
         .map(x => SparkJobValid)
         .getOrElse(SparkJobInvalid("No wordList in context config"))
     }
   
     override def runJob(sc: SparkContext, jobConfig: Config): Any = {
   
       // Input test list
       val inputBuffer = scala.collection.JavaConversions.asScalaBuffer(jobConfig.getStringList("wordList"))
       val WORDS = inputBuffer.toList
   
       // Pipeline options
       val sparkPipelineOptions = PipelineOptionsFactory.as(classOf[SparkContextOptions])
       sparkPipelineOptions.setAppName("Beam WordCount test")
       sparkPipelineOptions.setRunner(classOf[SparkRunner])
       sparkPipelineOptions.setUsesProvidedSparkContext(true)
       sparkPipelineOptions.setProvidedSparkContext(new JavaSparkContext(sc))
   
       // Pipeline
       val pipeline = Pipeline.create(sparkPipelineOptions)
   
       // Input + processing + Output
       val output = pipeline
         .apply(Create.of(JavaConversions.seqAsJavaList(WORDS)).withCoder(StringUtf8Coder.of()))
         .apply(new CountWords())
   
       // Result
       // val result: EvaluationResult = pipeline.run().asInstanceOf[EvaluationResult]
   
       // Run job & wait until finish
       pipeline.run().waitUntilFinish()
     }
   }
   ```
   
   It'd be nice to contribute this as new documentation if you manage to get it to work. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 138351)
    Time Spent: 1h 10m  (was: 1h)

> Enable the use of an existing spark context with the SparkPipelineRunner
> ------------------------------------------------------------------------
>
>                 Key: BEAM-313
>                 URL: https://issues.apache.org/jira/browse/BEAM-313
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-spark
>            Reporter: Abbass Marouni
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>             Fix For: 0.3.0-incubating
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The general use case is that the SparkPipelineRunner creates its own Spark context and
uses it for the pipeline execution.
> Another alternative is to provide the SparkPipelineRunner with an existing spark context.
This can be interesting for a lot of use cases where the Spark context is managed outside
of beam (context reuse, advanced context management, spark job server, ...).
> code sample : https://github.com/amarouni/incubator-beam/commit/fe0bb517bf0ccde07ef5a61f3e44df695b75f076



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message