spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/2] spark git commit: [SPARK-4806] Streaming doc update for 1.2
Date Thu, 11 Dec 2014 14:21:30 GMT
[SPARK-4806] Streaming doc update for 1.2

Important updates to the streaming programming guide
- Make the fault-tolerance properties easier to understand, with information about write ahead
logs
- Update the information about deploying the spark streaming app with information about Driver
HA
- Update Receiver guide to discuss reliable vs unreliable receivers.

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Josh Rosen <joshrosen@databricks.com>
Author: Josh Rosen <rosenville@gmail.com>

Closes #3653 from tdas/streaming-doc-update-1.2 and squashes the following commits:

f53154a [Tathagata Das] Addressed Josh's comments.
ce299e4 [Tathagata Das] Minor update.
ca19078 [Tathagata Das] Minor change
f746951 [Tathagata Das] Mentioned performance problem with WAL
7787209 [Tathagata Das] Merge branch 'streaming-doc-update-1.2' of github.com:tdas/spark into
streaming-doc-update-1.2
2184729 [Tathagata Das] Updated Kafka and Flume guides with reliability information.
2f3178c [Tathagata Das] Added more information about writing reliable receivers in the custom
receiver guide.
91aa5aa [Tathagata Das] Improved API Docs menu
5707581 [Tathagata Das] Added Pythn API badge
b9c8c24 [Tathagata Das] Merge pull request #26 from JoshRosen/streaming-programming-guide
b8c8382 [Josh Rosen] minor fixes
a4ef126 [Josh Rosen] Restructure parts of the fault-tolerance section to read a bit nicer
when skipping over the headings
65f66cd [Josh Rosen] Fix broken link to fault-tolerance semantics section.
f015397 [Josh Rosen] Minor grammar / pluralization fixes.
3019f3a [Josh Rosen] Fix minor Markdown formatting issues
aa8bb87 [Tathagata Das] Small update.
195852c [Tathagata Das] Updated based on Josh's comments, updated receiver reliability and
deploying section, and also updated configuration.
17b99fb [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-doc-update-1.2
a0217c0 [Tathagata Das] Changed Deploying menu layout
67fcffc [Tathagata Das] Added cluster mode + supervise example to submitting application guide.
e45453b [Tathagata Das] Update streaming guide, added deploying section.
192c7a7 [Tathagata Das] Added more info about Python API, and rewrote the checkpointing section.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b004150a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b004150a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b004150a

Branch: refs/heads/master
Commit: b004150adb503ddbb54d5cd544e39ad974497c41
Parents: 2a5b5fd
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Thu Dec 11 06:21:23 2014 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Thu Dec 11 06:21:23 2014 -0800

----------------------------------------------------------------------
 docs/_layouts/global.html           |   13 +-
 docs/configuration.md               |  133 ++--
 docs/streaming-custom-receivers.md  |   90 ++-
 docs/streaming-flume-integration.md |   13 +-
 docs/streaming-kafka-integration.md |   17 +
 docs/streaming-programming-guide.md | 1068 +++++++++++++++++-------------
 docs/submitting-applications.md     |   36 +-
 7 files changed, 819 insertions(+), 551 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/_layouts/global.html
----------------------------------------------------------------------
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index 627ed37..8841f76 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -33,7 +33,7 @@
         <!-- Google analytics script -->
         <script type="text/javascript">
           var _gaq = _gaq || [];
-          _gaq.push(['_setAccount', 'UA-32518208-1']);
+          _gaq.push(['_setAccount', 'UA-32518208-2']);
           _gaq.push(['_trackPageview']);
 
           (function() {
@@ -79,9 +79,9 @@
                         <li class="dropdown">
                             <a href="#" class="dropdown-toggle" data-toggle="dropdown">API
Docs<b class="caret"></b></a>
                             <ul class="dropdown-menu">
-                                <li><a href="api/scala/index.html#org.apache.spark.package">Scaladoc</a></li>
-                                <li><a href="api/java/index.html">Javadoc</a></li>
-                                <li><a href="api/python/index.html">Python API</a></li>
+                                <li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li>
+                                <li><a href="api/java/index.html">Java</a></li>
+                                <li><a href="api/python/index.html">Python</a></li>
                             </ul>
                         </li>
 
@@ -91,10 +91,11 @@
                                 <li><a href="cluster-overview.html">Overview</a></li>
                                 <li><a href="submitting-applications.html">Submitting
Applications</a></li>
                                 <li class="divider"></li>
-                                <li><a href="ec2-scripts.html">Amazon EC2</a></li>
-                                <li><a href="spark-standalone.html">Standalone
Mode</a></li>
+                                <li><a href="spark-standalone.html">Spark Standalone</a></li>
                                 <li><a href="running-on-mesos.html">Mesos</a></li>
                                 <li><a href="running-on-yarn.html">YARN</a></li>
+                                <li class="divider"></li>
+                                <li><a href="ec2-scripts.html">Amazon EC2</a></li>
                             </ul>
                         </li>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index d50b046..acee267 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -8,7 +8,7 @@ title: Spark Configuration
 Spark provides three locations to configure the system:
 
 * [Spark properties](#spark-properties) control most application parameters and can be set
by using
-  a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) object, or through Java
+  a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object, or through Java
   system properties.
 * [Environment variables](#environment-variables) can be used to set per-machine settings,
such as
   the IP address, through the `conf/spark-env.sh` script on each node.
@@ -23,8 +23,8 @@ application. These properties can be set directly on a
 (e.g. master URL and application name), as well as arbitrary key-value pairs through the
 `set()` method. For example, we could initialize an application with two threads as follows:
 
-Note that we run with local[2], meaning two threads - which represents "minimal" parallelism,

-which can help detect bugs that only exist when we run in a distributed context. 
+Note that we run with local[2], meaning two threads - which represents "minimal" parallelism,
+which can help detect bugs that only exist when we run in a distributed context.
 
 {% highlight scala %}
 val conf = new SparkConf()
@@ -35,7 +35,7 @@ val sc = new SparkContext(conf)
 {% endhighlight %}
 
 Note that we can have more than 1 thread in local mode, and in cases like spark streaming,
we may actually
-require one to prevent any sort of starvation issues.  
+require one to prevent any sort of starvation issues.
 
 ## Dynamically Loading Spark Properties
 In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`.
For
@@ -48,8 +48,8 @@ val sc = new SparkContext(new SparkConf())
 
 Then, you can supply configuration values at runtime:
 {% highlight bash %}
-./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false 
-  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

+./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false
+  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
 {% endhighlight %}
 
 The Spark shell and [`spark-submit`](submitting-applications.html)
@@ -123,7 +123,7 @@ of the most common options to set are:
   <td>
     Limit of total size of serialized results of all partitions for each Spark action (e.g.
collect).
     Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size
-    is above this limit. 
+    is above this limit.
     Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory
     and memory overhead of objects in JVM). Setting a proper limit can protect the driver
from
     out-of-memory errors.
@@ -218,6 +218,45 @@ Apart from these, the following properties are also available, and may
be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.executor.logs.rolling.strategy</code></td>
+  <td>(none)</td>
+  <td>
+    Set the strategy of rolling of executor logs. By default it is disabled. It can
+    be set to "time" (time-based rolling) or "size" (size-based rolling). For "time",
+    use <code>spark.executor.logs.rolling.time.interval</code> to set the rolling
interval.
+    For "size", use <code>spark.executor.logs.rolling.size.maxBytes</code> to
set
+    the maximum file size for rolling.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.executor.logs.rolling.time.interval</code></td>
+  <td>daily</td>
+  <td>
+    Set the time interval by which the executor logs will be rolled over.
+    Rolling is disabled by default. Valid values are `daily`, `hourly`, `minutely` or
+    any interval in seconds. See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+    for automatic cleaning of old logs.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.executor.logs.rolling.size.maxBytes</code></td>
+  <td>(none)</td>
+  <td>
+    Set the max size of the file by which the executor logs will be rolled over.
+    Rolling is disabled by default. Value is set in terms of bytes.
+    See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+    for automatic cleaning of old logs.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.executor.logs.rolling.maxRetainedFiles</code></td>
+  <td>(none)</td>
+  <td>
+    Sets the number of latest rolling log files that are going to be retained by the system.
+    Older log files will be deleted. Disabled by default.
+  </td>
+</tr>
+<tr>
   <td><code>spark.files.userClassPathFirst</code></td>
   <td>false</td>
   <td>
@@ -250,10 +289,11 @@ Apart from these, the following properties are also available, and may
be useful
   <td><code>spark.python.profile.dump</code></td>
   <td>(none)</td>
   <td>
-    The directory which is used to dump the profile result before driver exiting. 
+    The directory which is used to dump the profile result before driver exiting.
     The results will be dumped as separated file for each RDD. They can be loaded
     by ptats.Stats(). If this is specified, the profile result will not be displayed
     automatically.
+  </td>
 </tr>
 <tr>
   <td><code>spark.python.worker.reuse</code></td>
@@ -269,8 +309,8 @@ Apart from these, the following properties are also available, and may
be useful
   <td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
   <td>(none)</td>
   <td>
-    Add the environment variable specified by <code>EnvironmentVariableName</code>
to the Executor 
-    process. The user can specify multiple of these and to set multiple environment variables.

+    Add the environment variable specified by <code>EnvironmentVariableName</code>
to the Executor
+    process. The user can specify multiple of these and to set multiple environment variables.
   </td>
 </tr>
 <tr>
@@ -475,9 +515,9 @@ Apart from these, the following properties are also available, and may
be useful
   <td>
     The codec used to compress internal data such as RDD partitions, broadcast variables
and
     shuffle outputs. By default, Spark provides three codecs: <code>lz4</code>,
<code>lzf</code>,
-    and <code>snappy</code>. You can also use fully qualified class names to
specify the codec, 
-    e.g. 
-    <code>org.apache.spark.io.LZ4CompressionCodec</code>,    
+    and <code>snappy</code>. You can also use fully qualified class names to
specify the codec,
+    e.g.
+    <code>org.apache.spark.io.LZ4CompressionCodec</code>,
     <code>org.apache.spark.io.LZFCompressionCodec</code>,
     and <code>org.apache.spark.io.SnappyCompressionCodec</code>.
   </td>
@@ -945,7 +985,7 @@ Apart from these, the following properties are also available, and may
be useful
     (resources are executors in yarn mode, CPU cores in standalone mode)
     to wait for before scheduling begins. Specified as a double between 0.0 and 1.0.
     Regardless of whether the minimum ratio of resources has been reached,
-    the maximum amount of time it will wait before scheduling begins is controlled by config

+    the maximum amount of time it will wait before scheduling begins is controlled by config
     <code>spark.scheduler.maxRegisteredResourcesWaitingTime</code>.
   </td>
 </tr>
@@ -954,7 +994,7 @@ Apart from these, the following properties are also available, and may
be useful
   <td>30000</td>
   <td>
     Maximum amount of time to wait for resources to register before scheduling begins
-    (in milliseconds).  
+    (in milliseconds).
   </td>
 </tr>
 <tr>
@@ -1023,7 +1063,7 @@ Apart from these, the following properties are also available, and may
be useful
   <td>false</td>
   <td>
     Whether Spark acls should are enabled. If enabled, this checks to see if the user has
-    access permissions to view or modify the job.  Note this requires the user to be known,

+    access permissions to view or modify the job.  Note this requires the user to be known,
     so if the user comes across as null no checks are done. Filters can be used with the
UI
     to authenticate and set the user.
   </td>
@@ -1062,17 +1102,31 @@ Apart from these, the following properties are also available, and
may be useful
   <td><code>spark.streaming.blockInterval</code></td>
   <td>200</td>
   <td>
-    Interval (milliseconds) at which data received by Spark Streaming receivers is coalesced
-    into blocks of data before storing them in Spark.
+    Interval (milliseconds) at which data received by Spark Streaming receivers is chunked
+    into blocks of data before storing them in Spark. Minimum recommended - 50 ms. See the
+    <a href="streaming-programming-guide.html#level-of-parallelism-in-data-receiving">performance
+     tuning</a> section in the Spark Streaming programing guide for more details.
   </td>
 </tr>
 <tr>
   <td><code>spark.streaming.receiver.maxRate</code></td>
   <td>infinite</td>
   <td>
-    Maximum rate (per second) at which each receiver will push data into blocks. Effectively,
-    each stream will consume at most this number of records per second.
+    Maximum number records per second at which each receiver will receive data.
+    Effectively, each stream will consume at most this number of records per second.
     Setting this configuration to 0 or a negative number will put no limit on the rate.
+    See the <a href="streaming-programming-guide.html#deploying-applications">deployment
guide</a>
+    in the Spark Streaming programing guide for mode details.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.streaming.receiver.writeAheadLogs.enable</code></td>
+  <td>false</td>
+  <td>
+    Enable write ahead logs for receivers. All the input data received through receivers
+    will be saved to write ahead logs that will allow it to be recovered after driver failures.
+    See the <a href="streaming-programming-guide.html#deploying-applications">deployment
guide</a>
+    in the Spark Streaming programing guide for more details.
   </td>
 </tr>
 <tr>
@@ -1086,45 +1140,6 @@ Apart from these, the following properties are also available, and
may be useful
     higher memory usage in Spark.
   </td>
 </tr>
-<tr>
-  <td><code>spark.executor.logs.rolling.strategy</code></td>
-  <td>(none)</td>
-  <td>
-    Set the strategy of rolling of executor logs. By default it is disabled. It can
-    be set to "time" (time-based rolling) or "size" (size-based rolling). For "time",
-    use <code>spark.executor.logs.rolling.time.interval</code> to set the rolling
interval.
-    For "size", use <code>spark.executor.logs.rolling.size.maxBytes</code> to
set
-    the maximum file size for rolling.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.executor.logs.rolling.time.interval</code></td>
-  <td>daily</td>
-  <td>
-    Set the time interval by which the executor logs will be rolled over.
-    Rolling is disabled by default. Valid values are `daily`, `hourly`, `minutely` or
-    any interval in seconds. See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
-    for automatic cleaning of old logs.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.executor.logs.rolling.size.maxBytes</code></td>
-  <td>(none)</td>
-  <td>
-    Set the max size of the file by which the executor logs will be rolled over.
-    Rolling is disabled by default. Value is set in terms of bytes.
-    See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
-    for automatic cleaning of old logs.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.executor.logs.rolling.maxRetainedFiles</code></td>
-  <td>(none)</td>
-  <td>
-    Sets the number of latest rolling log files that are going to be retained by the system.
-    Older log files will be deleted. Disabled by default.
-  </td>
-</tr>
 </table>
 
 #### Cluster Managers

http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/streaming-custom-receivers.md
----------------------------------------------------------------------
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 27cd085..6a20481 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -7,25 +7,30 @@ Spark Streaming can receive streaming data from any arbitrary data source
beyond
 the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files,
sockets, etc.).
 This requires the developer to implement a *receiver* that is customized for receiving data
from
 the concerned data source. This guide walks through the process of implementing a custom
receiver
-and using it in a Spark Streaming application.
+and using it in a Spark Streaming application. Note that custom receivers can be implemented
+in Scala or Java.
 
-### Implementing a Custom Receiver
+## Implementing a Custom Receiver
 
-This starts with implementing a [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver).
+This starts with implementing a **Receiver**
+([Scala doc](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver),
+[Java doc](api/java/org/apache/spark/streaming/receiver/Receiver.html)).
 A custom receiver must extend this abstract class by implementing two methods
+
 - `onStart()`: Things to do to start receiving data.
 - `onStop()`: Things to do to stop receiving data.
 
-Note that `onStart()` and `onStop()` must not block indefinitely. Typically, onStart() would
start the threads
+Both `onStart()` and `onStop()` must not block indefinitely. Typically, `onStart()` would
start the threads
 that responsible for receiving the data and `onStop()` would ensure that the receiving by
those threads
 are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check
whether they
 should stop receiving data.
 
 Once the data is received, that data can be stored inside Spark
-by calling `store(data)`, which is a method provided by the
-[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) class.
+by calling `store(data)`, which is a method provided by the Receiver class.
 There are number of flavours of `store()` which allow you store the received data
-record-at-a-time or as whole collection of objects / serialized bytes.
+record-at-a-time or as whole collection of objects / serialized bytes. Note that the flavour
of
+`store()` used to implemented a receiver affects its reliability and fault-tolerance semantics.
+This is discussed [later](#receiver-reliability) in more detail.
 
 Any exception in the receiving threads should be caught and handled properly to avoid silent
 failures of the receiver. `restart(<exception>)` will restart the receiver by
@@ -158,7 +163,7 @@ public class JavaCustomReceiver extends Receiver<String> {
 </div>
 
 
-### Using the custom receiver in a Spark Streaming application
+## Using the custom receiver in a Spark Streaming application
 
 The custom receiver can be used in a Spark Streaming application by using
 `streamingContext.receiverStream(<instance of custom receiver>)`. This will create
@@ -191,9 +196,68 @@ The full source code is in the example [JavaCustomReceiver.java](https://github.
 </div>
 </div>
 
-
-
-### Implementing and Using a Custom Actor-based Receiver
+## Receiver Reliability
+As discussed in brief in the
+[Spark Streaming Programming Guide](streaming-programming-guide.html#receiver-reliability),
+there are two kinds of receivers based on their reliability and fault-tolerance semantics.
+
+1. *Reliable Receiver* - For *reliable sources* that allow sent data to be acknowledged,
a
+  *reliable receiver* correctly acknowledges to the source that the data has been received
+  and stored in Spark reliably (that is, replicated successfully). Usually,
+  implementing this receiver involves careful consideration of the semantics of source
+  acknowledgements.
+1. *Unreliable Receiver* - These are receivers for unreliable sources that do not support
+  acknowledging. Even for reliable sources, one may implement an unreliable receiver that
+  do not go into the complexity of acknowledging correctly.
+
+To implement a *reliable receiver*, you have to use `store(multiple-records)` to store data.
+This flavour of `store` is a blocking call which returns only after all the given records
have
+been stored inside Spark. If the receiver's configured storage level uses replication
+(enabled by default), then this call returns after replication has completed.
+Thus it ensures that the data is reliably stored, and the receiver can now acknowledge the
+source appropriately. This ensures that no data is caused when the receiver fails in the
middle
+of replicating data -- the buffered data will not be acknowledged and hence will be later
resent
+by the source.
+
+An *unreliable receiver* does not have to implement any of this logic. It can simply receive
+records from the source and insert them one-at-a-time using `store(single-record)`. While
it does
+not get the reliability guarantees of `store(multiple-records)`, it has the following advantages.
+
+- The system takes care of chunking that data into appropriate sized blocks (look for block
+interval in the [Spark Streaming Programming Guide](streaming-programming-guide.html)).
+- The system takes care of controlling the receiving rates if the rate limits have been specified.
+- Because of these two, unreliable receivers are simpler to implement than reliable receivers.
+
+The following table summarizes the characteristics of both types of receivers
+
+<table class="table">
+<tr>
+  <th>Receiver Type</th>
+  <th>Characteristics</th>
+</tr>
+<tr>
+  <td><b>Unreliable Receivers</b></td>
+  <td>
+    Simple to implement.<br>
+    System takes care of block generation and rate control.
+    No fault-tolerance guarantees, can lose data on receiver failure.
+  </td>
+</tr>
+<tr>
+  <td><b>Reliable Receivers</b></td>
+  <td>
+    Strong fault-tolerance guarantees, can ensure zero data loss.<br/>
+    Block generation and rate control to be handled by the receiver implementation.<br/>
+    Implementation complexity depends on the acknowledgement mechanisms of the source.
+  </td>
+</tr>
+<tr>
+  <td></td>
+  <td></td>
+</tr>
+</table>
+
+## Implementing and Using a Custom Actor-based Receiver
 
 Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used
to
 receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
@@ -203,7 +267,7 @@ trait can be applied on any Akka actor, which allows received data to
be stored
 {% highlight scala %}
 class CustomActor extends Actor with ActorHelper {
   def receive = {
-   case data: String => store(data)
+    case data: String => store(data)
   }
 }
 {% endhighlight %}
@@ -217,5 +281,3 @@ val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
 
 See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala)
 for an end-to-end example.
-
-

http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/streaming-flume-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md
index d57c3e0..ac01dd3 100644
--- a/docs/streaming-flume-integration.md
+++ b/docs/streaming-flume-integration.md
@@ -66,9 +66,16 @@ configuring Flume agents.
 
 ## Approach 2 (Experimental): Pull-based Approach using a Custom Sink
 Instead of Flume pushing data directly to Spark Streaming, this approach runs a custom Flume
sink that allows the following.
+
 - Flume pushes data into the sink, and the data stays buffered.
-- Spark Streaming uses transactions to pull data from the sink. Transactions succeed only
after data is received and replicated by Spark Streaming.
-This ensures that better reliability and fault-tolerance than the previous approach. However,
this requires configuring Flume to run a custom sink. Here are the configuration steps.
+- Spark Streaming uses a [reliable Flume receiver](streaming-programming-guide.html#receiver-reliability)
+  and transactions to pull data from the sink. Transactions succeed only after data is received
and
+  replicated by Spark Streaming.
+
+This ensures stronger reliability and
+[fault-tolerance guarantees](streaming-programming-guide.html#fault-tolerance-semantics)
+than the previous approach. However, this requires configuring Flume to run a custom sink.
+Here are the configuration steps.
 
 #### General Requirements
 Choose a machine that will run the custom sink in a Flume agent. The rest of the Flume pipeline
is configured to send data to that agent. Machines in the Spark cluster should have access
to the chosen machine running the custom sink.
@@ -104,7 +111,7 @@ See the [Flume's documentation](https://flume.apache.org/documentation.html)
for
 configuring Flume agents.
 
 #### Configuring Spark Streaming Application
-1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against
the `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` (see [Linking section](streaming-programming-guide.html#linking)
in the main programming guide).
+1. **Linking:** In your SBT/Maven project definition, link your streaming application against
the `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` (see [Linking section](streaming-programming-guide.html#linking)
in the main programming guide).
 
 2. **Programming:** In the streaming application code, import `FlumeUtils` and create input
DStream as follows.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md
index a3b705d..1c956fc 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -40,3 +40,20 @@ title: Spark Streaming + Kafka Integration Guide
 	- Multiple Kafka input DStreams can be created with different groups and topics for parallel
receiving of data using multiple receivers.
 
 3. **Deploying:** Package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies
(except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}`
which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to
launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications)
in the main programming guide).
+
+Note that the Kafka receiver used by default is an
+[*unreliable* receiver](streaming-programming-guide.html#receiver-reliability) section in
the
+programming guide). In Spark 1.2, we have added an experimental *reliable* Kafka receiver
that
+provides stronger
+[fault-tolerance guarantees](streaming-programming-guide.html#fault-tolerance-semantics)
of zero
+data loss on failures. This receiver is automatically used when the write ahead log
+(also introduced in Spark 1.2) is enabled
+(see [Deployment](#deploying-applications.html) section in the programming guide). This
+may reduce the receiving throughput of individual Kafka receivers compared to the unreliable
+receivers, but this can be corrected by running
+[more receivers in parallel](streaming-programming-guide.html#level-of-parallelism-in-data-receiving)
+to increase aggregate throughput. Additionally, it is recommended that the replication of
the
+received data within Spark be disabled when the write ahead log is enabled as the log is
already stored
+in a replicated storage system. This can be done by setting the storage level for the input
+stream to `StorageLevel.MEMORY_AND_DISK_SER` (that is, use
+`KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`).


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message