bahir-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ricellis <...@git.apache.org>
Subject [GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...
Date Mon, 10 Jul 2017 14:43:12 GMT
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/45#discussion_r126438628
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---
    @@ -98,29 +99,89 @@ class DefaultSource extends RelationProvider
     
           val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters)
     
    -      var allDocsDF: DataFrame = null
    +      var dataFrame: DataFrame = null
     
           val schema: StructType = {
             if (inSchema != null) {
               inSchema
    -        } else {
    -          val df = if (config.getSchemaSampleSize() ==
    -            JsonStoreConfigManager.ALL_DOCS_LIMIT &&
    +        } else if (!config.isInstanceOf[CloudantChangesConfig]
    +          || config.viewName != null || config.indexName != null) {
    +          val df = if (config.getSchemaSampleSize ==
    +            JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
                 config.viewName == null
                 && config.indexName == null) {
                 val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
    -            allDocsDF = sqlContext.read.json(cloudantRDD)
    -            allDocsDF
    +            dataFrame = sqlContext.read.json(cloudantRDD)
    +            dataFrame
               } else {
                 val dataAccess = new JsonStoreDataAccess(config)
                 val aRDD = sqlContext.sparkContext.parallelize(
    -                dataAccess.getMany(config.getSchemaSampleSize()))
    +                dataAccess.getMany(config.getSchemaSampleSize))
                 sqlContext.read.json(aRDD)
               }
               df.schema
    +        } else {
    +          /* Create a streaming context to handle transforming docs in
    +          * larger databases into Spark datasets
    +          */
    +          /* Allow the raw data and persisted RDDs to be accessible outside
    +          * of the streaming context.
    +          * See https://spark.apache.org/docs/latest/configuration.html
    +          * for more details.
    +          */
    +          sqlContext.sparkSession.conf.set("spark.streaming.unpersist", "false")
    +
    +          val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(10))
    +          val streamingMap = {
    +            val selector = config.asInstanceOf[CloudantChangesConfig].getSelector
    +            if (selector != null) {
    +              Map(
    +                "database" -> config.getDbname,
    +                "selector" -> selector
    +              )
    +            } else {
    +              Map(
    +                "database" -> config.getDbname
    +              )
    +            }
    +          }
    +
    +          val changes = ssc.receiverStream(
    +            new CloudantReceiver(sqlContext.sparkContext.getConf, streamingMap))
    +          changes.persist(config.asInstanceOf[CloudantChangesConfig]
    +            .getStorageLevelForStreaming)
    +
    +          // Global RDD that's created from union of all RDDs
    +          var globalRDD = ssc.sparkContext.emptyRDD[String]
    +
    +          logger.info("Loading data from Cloudant using "
    +            + config.asInstanceOf[CloudantChangesConfig].getContinuousChangesUrl)
    +
    +          // Collect and union each RDD to convert all RDDs to a DataFrame
    +          changes.foreachRDD((rdd: RDD[String]) => {
    +            if (!rdd.isEmpty()) {
    +              if (globalRDD != null) {
    +                // Union RDDs in foreach loop
    +                globalRDD = globalRDD.union(rdd)
    +              } else {
    +                globalRDD = rdd
    +              }
    +            } else {
    +              // Convert final global RDD[String] to DataFrame
    +              dataFrame = sqlContext.sparkSession.read.json(globalRDD)
    +              ssc.stop(stopSparkContext = false, stopGracefully = false)
    +            }
    +          })
    +
    +          ssc.start
    +          // run streaming until all docs from continuous feed are received
    +          ssc.awaitTermination
    +          // ssc.stop(stopSparkContext = false, stopGracefully = false)
    --- End diff --
    
    Commented out code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message