spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [DOCS][SS] fix structured streaming python example
Date Sun, 12 Mar 2017 08:29:51 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 e481a7381 -> f9833c66a


[DOCS][SS] fix structured streaming python example

## What changes were proposed in this pull request?

- SS python example: `TypeError: 'xxx' object is not callable`
- some other doc issue.

## How was this patch tested?

Jenkins.

Author: uncleGen <hustyugm@gmail.com>

Closes #17257 from uncleGen/docs-ss-python.

(cherry picked from commit e29a74d5b1fa3f9356b7af5dd7e3fce49bc8eb7d)
Signed-off-by: Sean Owen <sowen@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9833c66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9833c66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9833c66

Branch: refs/heads/branch-2.1
Commit: f9833c66a2f11414357854dae00e9e2448869254
Parents: e481a73
Author: uncleGen <hustyugm@gmail.com>
Authored: Sun Mar 12 08:29:37 2017 +0000
Committer: Sean Owen <sowen@cloudera.com>
Committed: Sun Mar 12 08:29:46 2017 +0000

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md    | 18 +++++++++---------
 .../execution/streaming/FileStreamSource.scala    |  2 +-
 .../streaming/dstream/FileInputDStream.scala      |  2 +-
 3 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f9833c66/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 45ee551..d316e04 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -545,7 +545,7 @@ spark = SparkSession. ...
 
 # Read text from socket 
 socketDF = spark \
-    .readStream() \
+    .readStream \
     .format("socket") \
     .option("host", "localhost") \
     .option("port", 9999) \
@@ -558,7 +558,7 @@ socketDF.printSchema()
 # Read all the csv files written atomically in a directory
 userSchema = StructType().add("name", "string").add("age", "integer")
 csvDF = spark \
-    .readStream() \
+    .readStream \
     .option("sep", ";") \
     .schema(userSchema) \
     .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
@@ -995,7 +995,7 @@ Here is the compatibility matrix.
         <br/><br/>
         Update mode uses watermark to drop old aggregation state.
         <br/><br/>
-        Complete mode does drop not old aggregation state since by definition this mode
+        Complete mode does not drop old aggregation state since by definition this mode
         preserves all data in the Result Table.
     </td>    
   </tr>
@@ -1217,13 +1217,13 @@ noAggDF = deviceDataDf.select("device").where("signal > 10")
 
 # Print new data to console
 noAggDF \
-    .writeStream() \
+    .writeStream \
     .format("console") \
     .start()
 
 # Write new data to Parquet files
 noAggDF \
-    .writeStream() \
+    .writeStream \
     .format("parquet") \
     .option("checkpointLocation", "path/to/checkpoint/dir") \
     .option("path", "path/to/destination/dir") \
@@ -1234,14 +1234,14 @@ aggDF = df.groupBy("device").count()
 
 # Print updated aggregations to console
 aggDF \
-    .writeStream() \
+    .writeStream \
     .outputMode("complete") \
     .format("console") \
     .start()
 
 # Have all the aggregates in an in memory table. The query name will be the table name
 aggDF \
-    .writeStream() \
+    .writeStream \
     .queryName("aggregates") \
     .outputMode("complete") \
     .format("memory") \
@@ -1329,7 +1329,7 @@ query.lastProgress();    // the most recent progress update of this
streaming qu
 <div data-lang="python"  markdown="1">
 
 {% highlight python %}
-query = df.writeStream().format("console").start()   # get the query object
+query = df.writeStream.format("console").start()   # get the query object
 
 query.id()          # get the unique identifier of the running query that persists across
restarts from checkpoint data
 
@@ -1674,7 +1674,7 @@ aggDF
 
 {% highlight python %}
 aggDF \
-    .writeStream() \
+    .writeStream \
     .outputMode("complete") \
     .option("checkpointLocation", "path/to/HDFS/dir") \
     .format("memory") \

http://git-wip-us.apache.org/repos/asf/spark/blob/f9833c66/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0f0b6f1..fd94bb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -86,7 +86,7 @@ class FileStreamSource(
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
 
   /**
    * Returns the maximum offset that can be retrieved from the source.

http://git-wip-us.apache.org/repos/asf/spark/blob/f9833c66/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index ed93058..905b1c5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -230,7 +230,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
    * - It must pass the user-provided file filter.
    * - It must be newer than the ignore threshold. It is assumed that files older than the
ignore
    *   threshold have already been considered or are existing files before start
-   *   (when newFileOnly = true).
+   *   (when newFilesOnly = true).
    * - It must not be present in the recently selected files that this class remembers.
    * - It must not be newer than the time of the batch (i.e. `currentTime` for which this
    *   file is being tested. This can occur if the driver was recovered, and the missing
batches


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message