Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 14750200C48 for ; Thu, 6 Apr 2017 17:33:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1158B160B84; Thu, 6 Apr 2017 15:33:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B11F7160B83 for ; Thu, 6 Apr 2017 17:32:59 +0200 (CEST) Received: (qmail 42066 invoked by uid 500); 6 Apr 2017 15:32:58 -0000 Mailing-List: contact commits-help@bahir.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@bahir.apache.org Delivered-To: mailing list commits@bahir.apache.org Received: (qmail 42057 invoked by uid 99); 6 Apr 2017 15:32:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Apr 2017 15:32:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CCFEFDFE2C; Thu, 6 Apr 2017 15:32:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: lresende@apache.org To: commits@bahir.apache.org Message-Id: <7d3d12120de64d008c3c8e7c02b2e936@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: bahir git commit: [BAHIR-101] Update sql-cloudant readme and python examples Date: Thu, 6 Apr 2017 15:32:58 +0000 (UTC) archived-at: Thu, 06 Apr 2017 15:33:01 -0000 Repository: bahir Updated Branches: refs/heads/master 889de659c -> 561291bfc [BAHIR-101] Update sql-cloudant readme and python examples Closes #40. Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/561291bf Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/561291bf Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/561291bf Branch: refs/heads/master Commit: 561291bfc17f8eae97318b39ea9cc2d80680d5ce Parents: 889de65 Author: Esteban Laver Authored: Mon Apr 3 18:05:44 2017 -0400 Committer: Luciano Resende Committed: Thu Apr 6 08:28:10 2017 -0700 ---------------------------------------------------------------------- sql-cloudant/README.md | 306 ++++++++----------- sql-cloudant/examples/python/CloudantApp.py | 9 +- sql-cloudant/examples/python/CloudantDF.py | 5 +- .../examples/python/CloudantDFOption.py | 5 +- 4 files changed, 143 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/561291bf/sql-cloudant/README.md ---------------------------------------------------------------------- diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md index 98a1c85..eaa8893 100644 --- a/sql-cloudant/README.md +++ b/sql-cloudant/README.md @@ -1,24 +1,12 @@ -Spark Cloudant Connector -================ +A library for reading data from Cloudant or CouchDB databases using Spark SQL and Spark Streaming. -Cloudant integration with Spark as Spark SQL external datasource, and Spark Streaming as a custom receiver. +[IBM® Cloudant®](https://cloudant.com) is a document-oriented DataBase as a Service (DBaaS). It stores data as documents +in JSON format. It's built with scalability, high availability, and durability in mind. It comes with a +wide variety of indexing options including map-reduce, Cloudant Query, full-text indexing, and +geospatial indexing. The replication capabilities make it easy to keep data in sync between database +clusters, desktop PCs, and mobile devices. - -## Contents: -0. [Linking](#Linking) -1. [Implementation of RelationProvider](#implementation-of-relationProvider) -2. [Implementation of Receiver](#implementation-of-Receiver) -3. [Sample applications](#Sample-application) - 1. [Using SQL In Python](#Using-SQL-In-Python) - 2. [Using SQL In Scala](#Using-SQL-In-Scala) - 3. [Using DataFrame In Python](#Using-DataFrame-In-Python) - 4. [Using DataFrame In Scala](#Using-DataFrame-In-Scala) - 5. [Using Streams In Scala](#Using-Streams-In-Scala) -4. [Configuration Overview](#Configuration-Overview) -5. [Known limitations and areas for improvement](#Known-limitations) - - -
+[Apache CouchDB™](http://couchdb.apache.org) is open source database software that focuses on ease of use and having an architecture that "completely embraces the Web". It has a document-oriented NoSQL database architecture and is implemented in the concurrency-oriented language Erlang; it uses JSON to store data, JavaScript as its query language using MapReduce, and HTTP for an API. ## Linking @@ -35,51 +23,96 @@ Using Maven: This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. -For example, to include it when starting the spark shell: $ bin/spark-shell --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. The `--packages` argument can also be used with `bin/spark-submit`. +Submit a job in Python: + + spark-submit --master local[4] --jars + +Submit a job in Scala: + + spark-submit --class "" --master local[4] --jars + This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards. -
+## Configuration options +The configuration is obtained in the following sequence: -### Implementation of RelationProvider +1. default in the Config, which is set in the application.conf +2. key in the SparkConf, which is set in SparkConf +3. key in the parameters, which is set in a dataframe or temporaty table options +4. "spark."+key in the SparkConf (as they are treated as the one passed in through spark-submit using --conf option) -[DefaultSource.scala](src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala) is a RelationProvider for loading data from Cloudant to Spark, and saving it back from Cloudant to Spark. It has the following functionalities: +Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code. -Functionality | Enablement ---- | --- -Table Option | database or path, search index, view -Scan Type | PrunedFilteredScan -Column Pruning | yes -Predicates Push Down | _id or first predicate -Parallel Loading | yes, except with search index -Insert-able | yes - -
+### Configuration in application.conf +Default values are defined in [here](cloudant-spark-sql/src/main/resources/application.conf). + +### Configuration on SparkConf -### Implementation of Receiver +Name | Default | Meaning +--- |:---:| --- +cloudant.protocol|https|protocol to use to transfer data: http or https +cloudant.host||cloudant host url +cloudant.username||cloudant userid +cloudant.password||cloudant password +jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition +jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited +jsonstore.rdd.minInPartition|10|the min rows in a partition. +jsonstore.rdd.requestTimeout|900000| the request timeout in milliseconds +bulkSize|200| the bulk save size +schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs +createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. -Spark Cloudant connector creates a discretized stream in Spark (Spark input DStream) out of Cloudant data sources. [CloudantReceiver.scala](src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala) is a custom Receiver that converts `_changes` feed from a Cloudant database to DStream in Spark. This allows all sorts of processing on this streamed data including [using DataFrames and SQL operations on it](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala). +### Configuration on Spark SQL Temporary Table or DataFrame -**NOTE:** Since CloudantReceiver for Spark Streaming is based on `_changes` API, there are some limitations that application developers should be aware of. Firstly, results returned from `_changes` are partially ordered, and may not be presented in order in which documents were updated. Secondly, in case of shards' unavailability, you may see duplicates, changes that have been seen already. Thus, it is up to applications using Spark Streaming with CloudantReceiver to keep track of _changes they have processed and detect duplicates. +Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS: +Name | Default | Meaning +--- |:---:| --- +database||cloudant database name +view||cloudant view w/o the database name. only used for load. +index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results. +path||cloudant: as database name if database is not present +schemaSampleSize|"-1"| the sample size used to discover the schema for this temp table. -1 scans all documents +bulkSize|200| the bulk save size +createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. -
+For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compount field. An example of loading data from a view: -## Sample applications +```python +spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')") -
+``` -### Using SQL In Python - -[CloudantApp.py](examples/python/CloudantApp.py) +### Configuration on Cloudant Receiver for Spark Streaming +Name | Default | Meaning +--- |:---:| --- +cloudant.host||cloudant host url +cloudant.username||cloudant userid +cloudant.password||cloudant password +database||cloudant database name +selector| all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark. + + +### Configuration in spark-submit using --conf option + +The above stated configuration keys can also be set using `spark-submit --conf` option. When passing configuration in spark-submit, make sure adding "spark." as prefix to the keys. + + +## Examples + +### Python API + +#### Using SQL In Python + ```python spark = SparkSession\ .builder\ @@ -90,21 +123,58 @@ spark = SparkSession\ .getOrCreate() -#### Loading temp table from Cloudant db +# Loading temp table from Cloudant db spark.sql(" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')") airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id") airportData.printSchema() print 'Total # of rows in airportData: ' + str(airportData.count()) for code in airportData.collect(): print code._id -``` +``` -
+See [CloudantApp.py](examples/python/CloudantApp.py) for examples. -### Using SQL In Scala +Submit job example: +``` +spark-submit --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD sql-cloudant/examples/python/CloudantApp.py +``` +#### Using DataFrame In Python -[CloudantApp.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala) +```python +spark = SparkSession\ + .builder\ + .appName("Cloudant Spark SQL Example in Python using dataframes")\ + .config("cloudant.host","ACCOUNT.cloudant.com")\ + .config("cloudant.username", "USERNAME")\ + .config("cloudant.password","PASSWORD")\ + .config("jsonstore.rdd.partitions", 8)\ + .getOrCreate() + +# ***1. Loading dataframe from Cloudant db +df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant") +df.cache() +df.printSchema() +df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show() +df.filter(df._id >= 'CAA').select("_id",'airportName').show() +``` + +See [CloudantDF.py](examples/python/CloudantDF.py) for examples. + +In case of doing multiple operations on a dataframe (select, filter etc.), +you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again. +Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`. Alternatively for large dbs to persist in memory & disk, use: + +```python +from pyspark import StorageLevel +df.persist(storageLevel = StorageLevel(True, True, False, True, 1)) +``` + +[Sample code](examples/python/CloudantDFOption.py) on using DataFrame option to define cloudant configuration + +### Scala API + +#### Using SQL In Scala ```scala val spark = SparkSession @@ -122,7 +192,7 @@ import spark.implicits._ spark.sql( s""" |CREATE TEMPORARY TABLE airportTable - |USING org.apache.bahir.cloudant.spark + |USING org.apache.bahir.cloudant |OPTIONS ( database 'n_airportcodemapping') """.stripMargin) // create a dataframe @@ -131,52 +201,17 @@ airportData.printSchema() println(s"Total # of rows in airportData: " + airportData.count()) // convert dataframe to array of Rows, and process each row airportData.map(t => "code: " + t(0) + ",name:" + t(1)).collect().foreach(println) - -``` - - -
- -### Using DataFrame In Python - -[CloudantDF.py](examples/python/CloudantDF.py). - -```python -spark = SparkSession\ - .builder\ - .appName("Cloudant Spark SQL Example in Python using dataframes")\ - .config("cloudant.host","ACCOUNT.cloudant.com")\ - .config("cloudant.username", "USERNAME")\ - .config("cloudant.password","PASSWORD")\ - .config("jsonstore.rdd.partitions", 8)\ - .getOrCreate() - -#### Loading dataframe from Cloudant db -df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant") -df.cache() -df.printSchema() -df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show() -df.filter(df._id >= 'CAA').select("_id",'airportName').show() ``` - -In case of doing multiple operations on a dataframe (select, filter etc.), -you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again. -Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`. Alternatively for large dbs to persist in memory & disk, use: - -```python -from pyspark import StorageLevel -df.persist(storageLevel = StorageLevel(True, True, False, True, 1)) -``` +See [CloudantApp.scala](examples/scala/src/main/scala/mytest/spark/CloudantApp.scala) for examples. -[Sample code on using DataFrame option to define cloudant configuration](examples/python/CloudantDFOption.py) - -
+Submit job example: +``` +spark-submit --class org.apache.spark.examples.sql.cloudant.CloudantApp --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD /path/to/spark-sql-cloudant_2.11-2.2.0-SNAPSHOT-tests.jar +``` ### Using DataFrame In Scala -[CloudantDF.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala) - -``` scala +```scala val spark = SparkSession .builder() .appName("Cloudant Spark SQL Example with Dataframe") @@ -199,15 +234,14 @@ val df2 = df.filter(df("flightSegmentId") === "AA106") .select("flightSegmentId","economyClassBaseCost") df2.show() df2.write.format("org.apache.bahir.cloudant").save("n_flight2") -``` +``` + +See [CloudantDF.scala](examples/scala/src/main/scala/mytest/spark/CloudantDF.scala) for examples. - [Sample code on using DataFrame option to define cloudant configuration](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala) - +[Sample code](examples/scala/src/main/scala/mytest/spark/CloudantDFOption.scala) on using DataFrame option to define Cloudant configuration. -
### Using Streams In Scala -[CloudantStreaming.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala) ```scala val ssc = new StreamingContext(sparkConf, Seconds(10)) @@ -235,9 +269,14 @@ ssc.start() Thread.sleep(120000L) ssc.stop(true) -``` +``` -By default, Spark Streaming will load all documents from a database. If you want to limit the loading to specific documents, use `selector` option of `CloudantReceiver` and specify your conditions ([CloudantStreamingSelector.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala)): +See [CloudantStreaming.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala) for examples. + +By default, Spark Streaming will load all documents from a database. If you want to limit the loading to +specific documents, use `selector` option of `CloudantReceiver` and specify your conditions +(See [CloudantStreamingSelector.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala) +example for more details): ```scala val changes = ssc.receiverStream(new CloudantReceiver(Map( @@ -247,78 +286,3 @@ val changes = ssc.receiverStream(new CloudantReceiver(Map( "database" -> "sales", "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}"))) ``` - - -
- -## Configuration Overview - -The configuration is obtained in the following sequence: - -1. default in the Config, which is set in the application.conf -2. key in the SparkConf, which is set in SparkConf -3. key in the parameters, which is set in a dataframe or temporaty table options, or StreamReceiver -4. "spark."+key in the SparkConf (as they are treated as the one passed in through spark-submit using --conf option) - -Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code. When passing configuration in spark-submit, make sure adding "spark." as prefix to the keys. - - -### Configuration in application.conf - -Default values are defined in [here](src/main/resources/application.conf) - -### Configuration on SparkConf - -Name | Default | Meaning ---- |:---:| --- -cloudant.protocol|https|protocol to use to transfer data: http or https -cloudant.host||cloudant host url -cloudant.username||cloudant userid -cloudant.password||cloudant password -jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition -jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited -jsonstore.rdd.minInPartition|10|the min rows in a partition. -jsonstore.rdd.requestTimeout|900000| the request timeout in milliseconds -bulkSize|200| the bulk save size -schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs -createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. - - -### Configuration on Spark SQL Temporary Table or DataFrame - -Besides overriding any SparkConf configuration, you can also set the following configurations at temporary table or dataframe level. - -Name | Default | Meaning ---- |:---:| --- -database||cloudant database name -view||cloudant view w/o the database name. only used for load. -index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results. -path||cloudant: as database name if database is not present - - -#### View Specific - -For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compound field. An example of loading data from a view: - -```python -spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')") - -``` - -### Configuration on Cloudant Receiver for Spark Streaming - -Besides overriding any SparkConf configuration, you can also set the following configurations at stream Receiver level - -Name | Default | Meaning ---- |:---:| --- -database||cloudant database name -selector| all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark. - - -
- -## Known limitations and areas for improvement - -* Loading data from Cloudant search index will work only for up to 200 results. - -* Need to improve how number of partitions is determined for parallel loading http://git-wip-us.apache.org/repos/asf/bahir/blob/561291bf/sql-cloudant/examples/python/CloudantApp.py ---------------------------------------------------------------------- diff --git a/sql-cloudant/examples/python/CloudantApp.py b/sql-cloudant/examples/python/CloudantApp.py index 029f39b..c403aeb 100644 --- a/sql-cloudant/examples/python/CloudantApp.py +++ b/sql-cloudant/examples/python/CloudantApp.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pprint from pyspark.sql import SparkSession spark = SparkSession\ @@ -30,16 +29,16 @@ spark = SparkSession\ spark.sql(" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')") airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id") airportData.printSchema() -print 'Total # of rows in airportData: ' + str(airportData.count()) +print ('Total # of rows in airportData: ' + str(airportData.count())) for code in airportData.collect(): - print code._id + print (code._id) # ***2. Loading temp table from Cloudant search index -print 'About to test org.apache.bahir.cloudant for flight with index' +print ('About to test org.apache.bahir.cloudant for flight with index') spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', index '_design/view/_search/n_flights')") flightData = spark.sql("SELECT flightSegmentId, scheduledDepartureTime FROM flightTable1 WHERE flightSegmentId >'AA9' AND flightSegmentId<'AA95'") flightData.printSchema() for code in flightData.collect(): - print 'Flight {0} on {1}'.format(code.flightSegmentId, code.scheduledDepartureTime) + print ('Flight {0} on {1}'.format(code.flightSegmentId, code.scheduledDepartureTime)) http://git-wip-us.apache.org/repos/asf/bahir/blob/561291bf/sql-cloudant/examples/python/CloudantDF.py ---------------------------------------------------------------------- diff --git a/sql-cloudant/examples/python/CloudantDF.py b/sql-cloudant/examples/python/CloudantDF.py index c009e98..a8af0fa 100644 --- a/sql-cloudant/examples/python/CloudantDF.py +++ b/sql-cloudant/examples/python/CloudantDF.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pprint from pyspark.sql import SparkSession # define cloudant related configuration @@ -54,7 +53,7 @@ df2.write.save("n_flight2", "org.apache.bahir.cloudant", bulkSize = "100", createDBOnSave="true") total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId", "scheduledDepartureTime").orderBy(df.flightSegmentId).count() -print "Total", total, "flights from table" +print ("Total", total, "flights from table") # ***3. Loading dataframe from a Cloudant search index @@ -63,7 +62,7 @@ df = spark.read.load(format="org.apache.bahir.cloudant", database="n_flight", df.printSchema() total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId", "scheduledDepartureTime").orderBy(df.flightSegmentId).count() -print "Total", total, "flights from index" +print ("Total", total, "flights from index") # ***4. Loading dataframe from a Cloudant view http://git-wip-us.apache.org/repos/asf/bahir/blob/561291bf/sql-cloudant/examples/python/CloudantDFOption.py ---------------------------------------------------------------------- diff --git a/sql-cloudant/examples/python/CloudantDFOption.py b/sql-cloudant/examples/python/CloudantDFOption.py index c045532..a7f5e38 100644 --- a/sql-cloudant/examples/python/CloudantDFOption.py +++ b/sql-cloudant/examples/python/CloudantDFOption.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pprint from pyspark.sql import SparkSession spark = SparkSession\ @@ -55,7 +54,7 @@ df.printSchema() total = df.filter(df.flightSegmentId >'AA9') \ .select("flightSegmentId", "scheduledDepartureTime") \ .orderBy(df.flightSegmentId).count() -print "Total", total, "flights from table" +print ("Total", total, "flights from table") # ***3. Loading dataframe from Cloudant search index @@ -69,4 +68,4 @@ df.printSchema() total = df.filter(df.flightSegmentId >'AA9') \ .select("flightSegmentId", "scheduledDepartureTime") \ .orderBy(df.flightSegmentId).count() -print "Total", total, "flights from index" +print ("Total", total, "flights from index")