spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-23454][SS][DOCS] Added trigger information to the Structured Streaming programming guide
Date Wed, 21 Feb 2018 02:16:28 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 1d78f03ae -> 3e7269eb9


[SPARK-23454][SS][DOCS] Added trigger information to the Structured Streaming programming
guide

## What changes were proposed in this pull request?

- Added clear information about triggers
- Made the semantics guarantees of watermarks more clear for streaming aggregations and stream-stream
joins.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20631 from tdas/SPARK-23454.

(cherry picked from commit 601d653bff9160db8477f86d961e609fc2190237)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e7269eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e7269eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e7269eb

Branch: refs/heads/branch-2.3
Commit: 3e7269eb904b591883300d7433e5c99be0b3b5b3
Parents: 1d78f03
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Tue Feb 20 18:16:10 2018 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Feb 20 18:16:23 2018 -0800

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md | 214 +++++++++++++++++++-
 1 file changed, 207 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3e7269eb/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 48d6d0b..9a83f15 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -904,7 +904,7 @@ windowedCounts <- count(
 </div>
 
 
-### Handling Late Data and Watermarking
+#### Handling Late Data and Watermarking
 Now consider what happens if one of the events arrives late to the application.
 For example, say, a word generated at 12:04 (i.e. event time) could be received by 
 the application at 12:11. The application should use the time 12:04 instead of 12:11
@@ -925,7 +925,9 @@ specifying the event time column and the threshold on how late the data
is expec
 event time. For a specific window starting at time `T`, the engine will maintain state and
allow late
 data to update the state until `(max event time seen by the engine - late threshold >
T)`. 
 In other words, late data within the threshold will be aggregated, 
-but data later than the threshold will be dropped. Let's understand this with an example.
We can 
+but data later than the threshold will start getting dropped
+(see [later]((#semantic-guarantees-of-aggregation-with-watermarking))
+in the section for the exact guarantees). Let's understand this with an example. We can
 easily define watermarking on the previous example using `withWatermark()` as shown below.
 
 <div class="codetabs">
@@ -1031,7 +1033,9 @@ then drops intermediate state of a window < watermark, and appends
the final
 counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10`
is 
 appended to the Result Table only after the watermark is updated to `12:11`. 
 
-**Conditions for watermarking to clean aggregation state**
+##### Conditions for watermarking to clean aggregation state
+{:.no_toc}
+
 It is important to note that the following conditions must be satisfied for the watermarking
to 
 clean the state in aggregation queries *(as of Spark 2.1.1, subject to change in the future)*.
 
@@ -1051,6 +1055,16 @@ from the aggregation column.
 For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append

 output mode.
 
+##### Semantic Guarantees of Aggregation with Watermarking
+{:.no_toc}
+
+- A watermark delay (set with `withWatermark`) of "2 hours" guarantees that the engine will
never
+drop any data that is less than 2 hours delayed. In other words, any data less than 2 hours
behind
+(in terms of event-time) the latest data processed till then is guaranteed to be aggregated.
+
+- However, the guarantee is strict only in one direction. Data delayed by more than 2 hours
is
+not guaranteed to be dropped; it may or may not get aggregated. More delayed is the data,
less
+likely is the engine going to process it.
 
 ### Join Operations
 Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame
@@ -1062,7 +1076,7 @@ Dataset/DataFrame will be the exactly the same as if it was with a static
Datase
 containing the same data in the stream.
 
 
-#### Stream-static joins
+#### Stream-static Joins
 
 Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join
and some
 type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple
example.
@@ -1269,6 +1283,12 @@ joined <- join(
 </div>
 </div>
 
+###### Semantic Guarantees of Stream-stream Inner Joins with Watermarking
+{:.no_toc}
+This is similar to the [guarantees provided by watermarking on aggregations](#semantic-guarantees-of-aggregation-with-watermarking).
+A watermark delay of "2 hours" guarantees that the engine will never drop any data that is
less than
+ 2 hours delayed. But data delayed by more than 2 hours may or may not get processed.
+
 ##### Outer Joins with Watermarking
 While the watermark + event-time constraints is optional for inner joins, for left and right
outer
 joins they must be specified. This is because for generating the NULL results in outer join,
the
@@ -1347,7 +1367,14 @@ joined <- join(
 </div>
 
 
-There are a few points to note regarding outer joins.
+###### Semantic Guarantees of Stream-stream Outer Joins with Watermarking
+{:.no_toc}
+Outer joins have the same guarantees as [inner joins](#semantic-guarantees-of-stream-stream-inner-joins-with-watermarking)
+regarding watermark delays and whether data will be dropped or not.
+
+###### Caveats
+{:.no_toc}
+There are a few important characteristics to note regarding how the outer results are generated.
 
 - *The outer NULL results will be generated with a delay that depends on the specified watermark
 delay and the time range condition.* This is because the engine has to wait for that long
to ensure
@@ -1962,7 +1989,7 @@ head(sql("select * from aggregates"))
 </div>
 </div>
 
-#### Using Foreach
+##### Using Foreach
 The `foreach` operation allows arbitrary operations to be computed on the output data. As
of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement
the interface `ForeachWriter`
 ([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
docs),
 which has methods that get called whenever there is a sequence of rows generated as output
after a trigger. Note the following important points.
@@ -1979,6 +2006,172 @@ which has methods that get called whenever there is a sequence of
rows generated
 
 - Whenever `open` is called, `close` will also be called (unless the JVM exits due to some
error). This is true even if `open` returns false. If there is any error in processing and
writing the data, `close` will be called with the error. It is your responsibility to clean
up state (e.g. connections, transactions, etc.) that have been created in `open` such that
there are no resource leaks.
 
+#### Triggers
+The trigger settings of a streaming query defines the timing of streaming data processing,
whether
+the query is going to executed as micro-batch query with a fixed batch interval or as a continuous
processing query.
+Here are the different kinds of triggers that are supported.
+
+<table class="table">
+  <tr>
+    <th>Trigger Type</th>
+    <th>Description</th>
+  </tr>
+  <tr>
+    <td><i>unspecified (default)</i></td>
+    <td>
+        If no trigger setting is explicitly specified, then by default, the query will be
+        executed in micro-batch mode, where micro-batches will be generated as soon as
+        the previous micro-batch has completed processing.
+    </td>
+  </tr>
+  <tr>
+    <td><b>Fixed interval micro-batches</b></td>
+    <td>
+        The query will be executed with micro-batches mode, where micro-batches will be kicked
off
+        at the user-specified intervals.
+        <ul>
+          <li>If the previous micro-batch completes within the interval, then the engine
will wait until
+          the interval is over before kicking off the next micro-batch.</li>
+
+          <li>If the previous micro-batch takes longer than the interval to complete
(i.e. if an
+          interval boundary is missed), then the next micro-batch will start as soon as the
+          previous one completes (i.e., it will not wait for the next interval boundary).</li>
+
+          <li>If no new data is available, then no micro-batch will be kicked off.</li>
+        </ul>
+    </td>
+  </tr>
+  <tr>
+    <td><b>One-time micro-batch</b></td>
+    <td>
+        The query will execute *only one* micro-batch to process all the available data and
then
+        stop on its own. This is useful in scenarios you want to periodically spin up a cluster,
+        process everything that is available since the last period, and then shutdown the
+        cluster. In some case, this may lead to significant cost savings.
+    </td>
+  </tr>
+  <tr>
+    <td><b>Continuous with fixed checkpoint interval</b><br/><i>(experimental)</i></td>
+    <td>
+        The query will be executed in the new low-latency, continuous processing mode. Read
more
+        about this in the <a href="#continuous-processing-experimental">Continuous
Processing section</a> below.
+    </td>
+  </tr>
+</table>
+
+Here are a few code examples.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+import org.apache.spark.sql.streaming.Trigger
+
+// Default trigger (runs micro-batch as soon as it can)
+df.writeStream
+  .format("console")
+  .start()
+
+// ProcessingTime trigger with two-seconds micro-batch interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.ProcessingTime("2 seconds"))
+  .start()
+
+// One-time trigger
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Once())
+  .start()
+
+// Continuous trigger with one-second checkpointing interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Continuous("1 second"))
+  .start()
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+import org.apache.spark.sql.streaming.Trigger
+
+// Default trigger (runs micro-batch as soon as it can)
+df.writeStream
+  .format("console")
+  .start();
+
+// ProcessingTime trigger with two-seconds micro-batch interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.ProcessingTime("2 seconds"))
+  .start();
+
+// One-time trigger
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Once())
+  .start();
+
+// Continuous trigger with one-second checkpointing interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Continuous("1 second"))
+  .start();
+
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+
+# Default trigger (runs micro-batch as soon as it can)
+df.writeStream \
+  .format("console") \
+  .start()
+
+# ProcessingTime trigger with two-seconds micro-batch interval
+df.writeStream \
+  .format("console") \
+  .trigger(processingTime='2 seconds') \
+  .start()
+
+# One-time trigger
+df.writeStream \
+  .format("console") \
+  .trigger(once=True) \
+  .start()
+
+# Continuous trigger with one-second checkpointing interval
+df.writeStream
+  .format("console")
+  .trigger(continuous='1 second')
+  .start()
+
+{% endhighlight %}
+</div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+# Default trigger (runs micro-batch as soon as it can)
+write.stream(df, "console")
+
+# ProcessingTime trigger with two-seconds micro-batch interval
+write.stream(df, "console", trigger.processingTime = "2 seconds")
+
+# One-time trigger
+write.stream(df, "console", trigger.once = TRUE)
+
+# Continuous trigger is not yet supported
+{% endhighlight %}
+</div>
+</div>
+
+
 ## Managing Streaming Queries
 The `StreamingQuery` object created when a query is started can be used to monitor and manage
the query. 
 
@@ -2516,7 +2709,10 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation
= "pat
 </div>
 </div>
 
-# Continuous Processing [Experimental]
+# Continuous Processing
+## [Experimental]
+{:.no_toc}
+
 **Continuous processing** is a new, experimental streaming execution mode introduced in Spark
2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees.
Compare this with the default *micro-batch processing* engine which can achieve exactly-once
guarantees but achieve latencies of ~100ms at best. For some types of queries (discussed below),
you can choose which mode to execute them in without modifying the application logic (i.e.
without changing the DataFrame/Dataset operations). 
 
 To run a supported query in continuous processing mode, all you need to do is specify a **continuous
trigger** with the desired checkpoint interval as a parameter. For example, 
@@ -2589,6 +2785,8 @@ spark \
 A checkpoint interval of 1 second means that the continuous processing engine will records
the progress of the query every second. The resulting checkpoints are in a format compatible
with the micro-batch engine, hence any query can be restarted with any trigger. For example,
a supported query started with the micro-batch mode can be restarted in continuous mode, and
vice versa. Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance
guarantees.
 
 ## Supported Queries
+{:.no_toc}
+
 As of Spark 2.3, only the following type of queries are supported in the continuous processing
mode.
 
 - *Operations*: Only map-like Dataset/DataFrame operations are supported in continuous mode,
that is, only projections (`select`, `map`, `flatMap`, `mapPartitions`, etc.) and selections
(`where`, `filter`, etc.).
@@ -2606,6 +2804,8 @@ As of Spark 2.3, only the following type of queries are supported in
the continu
 See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections for more details
on them. While the console sink is good for testing, the end-to-end low-latency processing
can be best observed with Kafka as the source and sink, as this allows the engine to process
the data and make the results available in the output topic within milliseconds of the input
data being available in the input topic.
 
 ## Caveats
+{:.no_toc}
+
 - Continuous processing engine launches multiple long-running tasks that continuously read
data from sources, process it and continuously write to sinks. The number of tasks required
by the query depends on how many partitions the query can read from the sources in parallel.
Therefore, before starting a continuous processing query, you must ensure there are enough
cores in the cluster to all the tasks in parallel. For example, if you are reading from a
Kafka topic that has 10 partitions, then the cluster must have at least 10 cores for the query
to make progress.
 - Stopping a continuous processing stream may produce spurious task termination warnings.
These can be safely ignored.
 - There are currently no automatic retries of failed tasks. Any failure will lead to the
query being stopped and it needs to be manually restarted from the checkpoint.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message