bahir-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From emlaver <...@git.apache.org>
Subject [GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...
Date Tue, 18 Jul 2017 06:23:10 GMT
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/45#discussion_r127890607
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---
    @@ -98,29 +99,81 @@ class DefaultSource extends RelationProvider
     
           val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters)
     
    -      var allDocsDF: DataFrame = null
    +      var dataFrame: DataFrame = null
     
           val schema: StructType = {
             if (inSchema != null) {
               inSchema
    -        } else {
    -          val df = if (config.getSchemaSampleSize() ==
    -            JsonStoreConfigManager.ALL_DOCS_LIMIT &&
    +        } else if (!config.isInstanceOf[CloudantChangesConfig]
    +          || config.viewName != null || config.indexName != null) {
    +          val df = if (config.getSchemaSampleSize ==
    +            JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
                 config.viewName == null
                 && config.indexName == null) {
                 val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
    -            allDocsDF = sqlContext.read.json(cloudantRDD)
    -            allDocsDF
    +            dataFrame = sqlContext.read.json(cloudantRDD)
    +            dataFrame
               } else {
                 val dataAccess = new JsonStoreDataAccess(config)
                 val aRDD = sqlContext.sparkContext.parallelize(
    -                dataAccess.getMany(config.getSchemaSampleSize()))
    +                dataAccess.getMany(config.getSchemaSampleSize))
                 sqlContext.read.json(aRDD)
               }
               df.schema
    +        } else {
    +          /* Create a streaming context to handle transforming docs in
    +          * larger databases into Spark datasets
    +          */
    +          val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(10))
    +          val streamingMap = {
    +            val selector = config.asInstanceOf[CloudantChangesConfig].getSelector
    +            if (selector != null) {
    +              Map(
    +                "database" -> config.getDbname,
    +                "selector" -> selector
    +              )
    +            } else {
    +              Map(
    +                "database" -> config.getDbname
    +              )
    +            }
    +          }
    +
    +          val changes = ssc.receiverStream(
    +            new CloudantReceiver(sqlContext.sparkContext.getConf, streamingMap))
    +          changes.persist(config.asInstanceOf[CloudantChangesConfig]
    +            .getStorageLevelForStreaming)
    +
    +          // Global RDD that's created from union of all RDDs
    +          var globalRDD = ssc.sparkContext.emptyRDD[String]
    +
    +          logger.info("Loading data from Cloudant using "
    +            + config.asInstanceOf[CloudantChangesConfig].getContinuousChangesUrl)
    +
    --- End diff --
    
    @mayya-sharipova Updates and fixes in c2c7fc4.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message