spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-9217] [STREAMING] Make the kinesis receiver reliable by recording sequence numbers
Date Wed, 05 Aug 2015 07:20:33 GMT
Repository: spark
Updated Branches:
  refs/heads/master 781c8d71a -> c2a71f071


[SPARK-9217] [STREAMING] Make the kinesis receiver reliable by recording sequence numbers

This PR is the second one in the larger issue of making the Kinesis integration reliable and provide WAL-free at-least once guarantee. It is based on the design doc - https://docs.google.com/document/d/1k0dl270EnK7uExrsCE7jYw7PYx0YC935uBcxn3p0f58/edit

In this PR, I have updated the Kinesis Receiver to do the following.
- Control the block generation, by creating its own BlockGenerator with own callback methods and using it to keep track of the ranges of sequence numbers that go into each block.
- More specifically, as the KinesisRecordProcessor provides small batches of records, the records are atomically inserted into the block (that is, either the whole batch is in the block, or not). Accordingly the sequence number range of the batch is recorded. Since there may be many batches added to a block, the receiver tracks all the range of sequence numbers that is added to a block.
- When the block is ready to be pushed, the block is pushed and the ranges are reported as metadata of the block. In addition, the ranges are used to find out the latest sequence number for each shard that can be checkpointed through the DynamoDB.
- Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence number for it own shard.
- The array of ranges in the block metadata is used to create KinesisBackedBlockRDDs. The ReceiverInputDStream has been slightly refactored to allow the creation of KinesisBackedBlockRDDs instead of the WALBackedBlockRDDs.

Things to be done
- [x] Add new test to verify that the sequence numbers are recovered.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #7825 from tdas/kinesis-receiver and squashes the following commits:

2159be9 [Tathagata Das] Fixed bug
569be83 [Tathagata Das] Fix scala style issue
bf31e22 [Tathagata Das] Added more documentation to make the kinesis test endpoint more configurable
3ad8361 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into kinesis-receiver
c693a63 [Tathagata Das] Removed unnecessary constructor params from KinesisTestUtils
e1f1d0a [Tathagata Das] Addressed PR comments
b9fa6bf [Tathagata Das] Fix serialization issues
f8b7680 [Tathagata Das] Updated doc
33fe43a [Tathagata Das] Added more tests
7997138 [Tathagata Das] Fix style errors
a806710 [Tathagata Das] Fixed unit test and use KinesisInputDStream
40a1709 [Tathagata Das] Fixed KinesisReceiverSuite tests
7e44df6 [Tathagata Das] Added documentation and fixed checkpointing
096383f [Tathagata Das] Added test, and addressed some of the comments.
84a7892 [Tathagata Das] fixed scala style issue
e19e37d [Tathagata Das] Added license
1cd7b66 [Tathagata Das] Updated kinesis receiver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2a71f07
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2a71f07
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2a71f07

Branch: refs/heads/master
Commit: c2a71f0714b7a6ab30c1c4998f606f782428971c
Parents: 781c8d7
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Wed Aug 5 00:20:26 2015 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Wed Aug 5 00:20:26 2015 -0700

----------------------------------------------------------------------
 .../kinesis/KinesisBackedBlockRDD.scala         |  20 +-
 .../streaming/kinesis/KinesisInputDStream.scala |  71 ++++++
 .../streaming/kinesis/KinesisReceiver.scala     | 195 ++++++++++++++--
 .../kinesis/KinesisRecordProcessor.scala        |  76 +++----
 .../streaming/kinesis/KinesisTestUtils.scala    |  63 ++++--
 .../spark/streaming/kinesis/KinesisUtils.scala  |  21 +-
 .../kinesis/KinesisBackedBlockRDDSuite.scala    |  18 +-
 .../streaming/kinesis/KinesisFunSuite.scala     |   4 +-
 .../kinesis/KinesisReceiverSuite.scala          |  41 ++--
 .../streaming/kinesis/KinesisStreamSuite.scala  | 222 +++++++++++++++----
 .../dstream/ReceiverInputDStream.scala          |  71 +++---
 11 files changed, 605 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c2a71f07/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 8f144a4..a003ddf 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -37,16 +37,18 @@ case class SequenceNumberRange(
 
 /** Class representing an array of Kinesis sequence number ranges */
 private[kinesis]
-case class SequenceNumberRanges(ranges: Array[SequenceNumberRange]) {
+case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
   def isEmpty(): Boolean = ranges.isEmpty
+
   def nonEmpty(): Boolean = ranges.nonEmpty
+
   override def toString(): String = ranges.mkString("SequenceNumberRanges(", ", ", ")")
 }
 
 private[kinesis]
 object SequenceNumberRanges {
   def apply(range: SequenceNumberRange): SequenceNumberRanges = {
-    new SequenceNumberRanges(Array(range))
+    new SequenceNumberRanges(Seq(range))
   }
 }
 
@@ -66,14 +68,14 @@ class KinesisBackedBlockRDDPartition(
  */
 private[kinesis]
 class KinesisBackedBlockRDD(
-    sc: SparkContext,
-    regionId: String,
-    endpointUrl: String,
+    @transient sc: SparkContext,
+    val regionName: String,
+    val endpointUrl: String,
     @transient blockIds: Array[BlockId],
-    @transient arrayOfseqNumberRanges: Array[SequenceNumberRanges],
+    @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
     @transient isBlockIdValid: Array[Boolean] = Array.empty,
-    retryTimeoutMs: Int = 10000,
-    awsCredentialsOption: Option[SerializableAWSCredentials] = None
+    val retryTimeoutMs: Int = 10000,
+    val awsCredentialsOption: Option[SerializableAWSCredentials] = None
   ) extends BlockRDD[Array[Byte]](sc, blockIds) {
 
   require(blockIds.length == arrayOfseqNumberRanges.length,
@@ -104,7 +106,7 @@ class KinesisBackedBlockRDD(
       }
       partition.seqNumberRanges.ranges.iterator.flatMap { range =>
         new KinesisSequenceRangeIterator(
-          credenentials, endpointUrl, regionId, range, retryTimeoutMs)
+          credenentials, endpointUrl, regionName, range, retryTimeoutMs)
       }
     }
     if (partition.isBlockIdValid) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c2a71f07/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
new file mode 100644
index 0000000..2e4204d
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+import org.apache.spark.streaming.{Duration, StreamingContext, Time}
+
+private[kinesis] class KinesisInputDStream(
+    @transient _ssc: StreamingContext,
+    streamName: String,
+    endpointUrl: String,
+    regionName: String,
+    initialPositionInStream: InitialPositionInStream,
+    checkpointAppName: String,
+    checkpointInterval: Duration,
+    storageLevel: StorageLevel,
+    awsCredentialsOption: Option[SerializableAWSCredentials]
+  ) extends ReceiverInputDStream[Array[Byte]](_ssc) {
+
+  private[streaming]
+  override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[Array[Byte]] = {
+
+    // This returns true even for when blockInfos is empty
+    val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty)
+
+    if (allBlocksHaveRanges) {
+      // Create a KinesisBackedBlockRDD, even when there are no blocks
+      val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
+      val seqNumRanges = blockInfos.map {
+        _.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray
+      val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
+      logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
+          s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
+      new KinesisBackedBlockRDD(
+        context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
+        isBlockIdValid = isBlockIdValid,
+        retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
+        awsCredentialsOption = awsCredentialsOption)
+    } else {
+      logWarning("Kinesis sequence number information was not present with some block metadata," +
+        " it may not be possible to recover from failures")
+      super.createBlockRDD(time, blockInfos)
+    }
+  }
+
+  override def getReceiver(): Receiver[Array[Byte]] = {
+    new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
+      checkpointAppName, checkpointInterval, storageLevel, awsCredentialsOption)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c2a71f07/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 1a8a4ce..a4baeec 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -18,17 +18,20 @@ package org.apache.spark.streaming.kinesis
 
 import java.util.UUID
 
+import scala.collection.JavaConversions.asScalaIterator
+import scala.collection.mutable
 import scala.util.control.NonFatal
 
-import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain}
 import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory}
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
+import com.amazonaws.services.kinesis.model.Record
 
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
 import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
 import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkEnv}
 
 
 private[kinesis]
@@ -42,38 +45,47 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
  * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
  * This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
  * https://github.com/awslabs/amazon-kinesis-client
- * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here:
- *   http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * Instances of this class will get shipped to the Spark Streaming Workers to run within a
- *   Spark Executor.
  *
- * @param appName  Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
- *                 by the Kinesis Client Library.  If you change the App name or Stream name,
- *                 the KCL will throw errors.  This usually requires deleting the backing
- *                 DynamoDB table with the same name this Kinesis application.
+ * The way this Receiver works is as follows:
+ * - The receiver starts a KCL Worker, which is essentially runs a threadpool of multiple
+ *   KinesisRecordProcessor
+ * - Each KinesisRecordProcessor receives data from a Kinesis shard in batches. Each batch is
+ *   inserted into a Block Generator, and the corresponding range of sequence numbers is recorded.
+ * - When the block generator defines a block, then the recorded sequence number ranges that were
+ *   inserted into the block are recorded separately for being used later.
+ * - When the block is ready to be pushed, the block is pushed and the ranges are reported as
+ *   metadata of the block. In addition, the ranges are used to find out the latest sequence
+ *   number for each shard that can be checkpointed through the DynamoDB.
+ * - Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence
+ *   number for it own shard.
+ *
  * @param streamName   Kinesis stream name
  * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
  * @param regionName  Region name used by the Kinesis Client Library for
  *                    DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
- *                            See the Kinesis Spark Streaming documentation for more
- *                            details on the different types of checkpoints.
  * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
  *                                 worker's initial starting position in the stream.
  *                                 The values are either the beginning of the stream
  *                                 per Kinesis' limit of 24 hours
  *                                 (InitialPositionInStream.TRIM_HORIZON) or
  *                                 the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointAppName  Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
+ *                 by the Kinesis Client Library.  If you change the App name or Stream name,
+ *                 the KCL will throw errors.  This usually requires deleting the backing
+ *                 DynamoDB table with the same name this Kinesis application.
+ * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+ *                            See the Kinesis Spark Streaming documentation for more
+ *                            details on the different types of checkpoints.
  * @param storageLevel Storage level to use for storing the received objects
  * @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
  *                             the credentials
  */
 private[kinesis] class KinesisReceiver(
-    appName: String,
-    streamName: String,
+    val streamName: String,
     endpointUrl: String,
     regionName: String,
     initialPositionInStream: InitialPositionInStream,
+    checkpointAppName: String,
     checkpointInterval: Duration,
     storageLevel: StorageLevel,
     awsCredentialsOption: Option[SerializableAWSCredentials]
@@ -90,7 +102,7 @@ private[kinesis] class KinesisReceiver(
    * workerId is used by the KCL should be based on the ip address of the actual Spark Worker
    * where this code runs (not the driver's IP address.)
    */
-  private var workerId: String = null
+  @volatile private var workerId: String = null
 
   /**
    * Worker is the core client abstraction from the Kinesis Client Library (KCL).
@@ -98,22 +110,40 @@ private[kinesis] class KinesisReceiver(
    * Each shard is assigned its own IRecordProcessor and the worker run multiple such
    * processors.
    */
-  private var worker: Worker = null
+  @volatile private var worker: Worker = null
+  @volatile private var workerThread: Thread = null
 
-  /** Thread running the worker */
-  private var workerThread: Thread = null
+  /** BlockGenerator used to generates blocks out of Kinesis data */
+  @volatile private var blockGenerator: BlockGenerator = null
 
   /**
+   * Sequence number ranges added to the current block being generated.
+   * Accessing and updating of this map is synchronized by locks in BlockGenerator.
+   */
+  private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange]
+
+  /** Sequence number ranges of data added to each generated block */
+  private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, SequenceNumberRanges]
+    with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
+
+  /**
+   * Latest sequence number ranges that have been stored successfully.
+   * This is used for checkpointing through KCL */
+  private val shardIdToLatestStoredSeqNum = new mutable.HashMap[String, String]
+    with mutable.SynchronizedMap[String, String]
+  /**
    * This is called when the KinesisReceiver starts and must be non-blocking.
    * The KCL creates and manages the receiving/processing thread pool through Worker.run().
    */
   override def onStart() {
+    blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, SparkEnv.get.conf)
+
     workerId = Utils.localHostName() + ":" + UUID.randomUUID()
 
     // KCL config instance
     val awsCredProvider = resolveAWSCredentialsProvider()
     val kinesisClientLibConfiguration =
-      new KinesisClientLibConfiguration(appName, streamName, awsCredProvider, workerId)
+      new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
       .withKinesisEndpoint(endpointUrl)
       .withInitialPositionInStream(initialPositionInStream)
       .withTaskBackoffTimeMillis(500)
@@ -141,6 +171,10 @@ private[kinesis] class KinesisReceiver(
         }
       }
     }
+
+    blockIdToSeqNumRanges.clear()
+    blockGenerator.start()
+
     workerThread.setName(s"Kinesis Receiver ${streamId}")
     workerThread.setDaemon(true)
     workerThread.start()
@@ -165,6 +199,81 @@ private[kinesis] class KinesisReceiver(
     workerId = null
   }
 
+  /** Add records of the given shard to the current block being generated */
+  private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = {
+    if (records.size > 0) {
+      val dataIterator = records.iterator().map { record =>
+        val byteBuffer = record.getData()
+        val byteArray = new Array[Byte](byteBuffer.remaining())
+        byteBuffer.get(byteArray)
+        byteArray
+      }
+      val metadata = SequenceNumberRange(streamName, shardId,
+        records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
+      blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
+
+    }
+  }
+
+  /** Get the latest sequence number for the given shard that can be checkpointed through KCL */
+  private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
+    shardIdToLatestStoredSeqNum.get(shardId)
+  }
+
+  /**
+   * Remember the range of sequence numbers that was added to the currently active block.
+   * Internally, this is synchronized with `finalizeRangesForCurrentBlock()`.
+   */
+  private def rememberAddedRange(range: SequenceNumberRange): Unit = {
+    seqNumRangesInCurrentBlock += range
+  }
+
+  /**
+   * Finalize the ranges added to the block that was active and prepare the ranges buffer
+   * for next block. Internally, this is synchronized with `rememberAddedRange()`.
+   */
+  private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
+    blockIdToSeqNumRanges(blockId) = SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray)
+    seqNumRangesInCurrentBlock.clear()
+    logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
+  }
+
+  /** Store the block along with its associated ranges */
+  private def storeBlockWithRanges(
+      blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[Array[Byte]]): Unit = {
+    val rangesToReportOption = blockIdToSeqNumRanges.remove(blockId)
+    if (rangesToReportOption.isEmpty) {
+      stop("Error while storing block into Spark, could not find sequence number ranges " +
+        s"for block $blockId")
+      return
+    }
+
+    val rangesToReport = rangesToReportOption.get
+    var attempt = 0
+    var stored = false
+    var throwable: Throwable = null
+    while (!stored && attempt <= 3) {
+      try {
+        store(arrayBuffer, rangesToReport)
+        stored = true
+      } catch {
+        case NonFatal(th) =>
+          attempt += 1
+          throwable = th
+      }
+    }
+    if (!stored) {
+      stop("Error while storing block into Spark", throwable)
+    }
+
+    // Update the latest sequence number that have been successfully stored for each shard
+    // Note that we are doing this sequentially because the array of sequence number ranges
+    // is assumed to be
+    rangesToReport.ranges.foreach { range =>
+      shardIdToLatestStoredSeqNum(range.shardId) = range.toSeqNumber
+    }
+  }
+
   /**
    * If AWS credential is provided, return a AWSCredentialProvider returning that credential.
    * Otherwise, return the DefaultAWSCredentialsProviderChain.
@@ -182,4 +291,46 @@ private[kinesis] class KinesisReceiver(
         new DefaultAWSCredentialsProviderChain()
     }
   }
+
+
+  /**
+   * Class to handle blocks generated by this receiver's block generator. Specifically, in
+   * the context of the Kinesis Receiver, this handler does the following.
+   *
+   * - When an array of records is added to the current active block in the block generator,
+   *   this handler keeps track of the corresponding sequence number range.
+   * - When the currently active block is ready to sealed (not more records), this handler
+   *   keep track of the list of ranges added into this block in another H
+   */
+  private class GeneratedBlockHandler extends BlockGeneratorListener {
+
+    /**
+     * Callback method called after a data item is added into the BlockGenerator.
+     * The data addition, block generation, and calls to onAddData and onGenerateBlock
+     * are all synchronized through the same lock.
+     */
+    def onAddData(data: Any, metadata: Any): Unit = {
+      rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
+    }
+
+    /**
+     * Callback method called after a block has been generated.
+     * The data addition, block generation, and calls to onAddData and onGenerateBlock
+     * are all synchronized through the same lock.
+     */
+    def onGenerateBlock(blockId: StreamBlockId): Unit = {
+      finalizeRangesForCurrentBlock(blockId)
+    }
+
+    /** Callback method called when a block is ready to be pushed / stored. */
+    def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+      storeBlockWithRanges(blockId,
+        arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Array[Byte]]])
+    }
+
+    /** Callback called in case of any error in internal of the BlockGenerator */
+    def onError(message: String, throwable: Throwable): Unit = {
+      reportError(message, throwable)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c2a71f07/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index fe9e3a0..b240512 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -18,20 +18,16 @@ package org.apache.spark.streaming.kinesis
 
 import java.util.List
 
-import scala.collection.JavaConversions.asScalaBuffer
 import scala.util.Random
+import scala.util.control.NonFatal
 
-import org.apache.spark.Logging
-
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer}
 import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
 import com.amazonaws.services.kinesis.model.Record
 
+import org.apache.spark.Logging
+
 /**
  * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
  * This implementation operates on the Array[Byte] from the KinesisReceiver.
@@ -51,6 +47,7 @@ private[kinesis] class KinesisRecordProcessor(
     checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging {
 
   // shardId to be populated during initialize()
+  @volatile
   private var shardId: String = _
 
   /**
@@ -75,47 +72,38 @@ private[kinesis] class KinesisRecordProcessor(
   override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
     if (!receiver.isStopped()) {
       try {
-        /*
-         * Notes:
-         * 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
-         *    Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
-         *    internally-configured Spark serializer (kryo, etc).
-         * 2) This is not desirable, so we instead store a raw Array[Byte] and decouple
-         *    ourselves from Spark's internal serialization strategy.
-         * 3) For performance, the BlockGenerator is asynchronously queuing elements within its
-         *    memory before creating blocks.  This prevents the small block scenario, but requires
-         *    that you register callbacks to know when a block has been generated and stored
-         *    (WAL is sufficient for storage) before can checkpoint back to the source.
-        */
-        batch.foreach(record => receiver.store(record.getData().array()))
-
-        logDebug(s"Stored:  Worker $workerId stored ${batch.size} records for shardId $shardId")
+        receiver.addRecords(shardId, batch)
+        logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
 
         /*
-         * Checkpoint the sequence number of the last record successfully processed/stored
-         *   in the batch.
-         * In this implementation, we're checkpointing after the given checkpointIntervalMillis.
-         * Note that this logic requires that processRecords() be called AND that it's time to
-         *   checkpoint.  I point this out because there is no background thread running the
-         *   checkpointer.  Checkpointing is tested and trigger only when a new batch comes in.
-         * If the worker is shutdown cleanly, checkpoint will happen (see shutdown() below).
-         * However, if the worker dies unexpectedly, a checkpoint may not happen.
-         * This could lead to records being processed more than once.
+         *
+         * Checkpoint the sequence number of the last record successfully stored.
+         * Note that in this current implementation, the checkpointing occurs only when after
+         * checkpointIntervalMillis from the last checkpoint, AND when there is new record
+         * to process. This leads to the checkpointing lagging behind what records have been
+         * stored by the receiver. Ofcourse, this can lead records processed more than once,
+         * under failures and restarts.
+         *
+         * TODO: Instead of checkpointing here, run a separate timer task to perform
+         * checkpointing so that it checkpoints in a timely manner independent of whether
+         * new records are available or not.
          */
         if (checkpointState.shouldCheckpoint()) {
-          /* Perform the checkpoint */
-          KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+          receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum =>
+            /* Perform the checkpoint */
+            KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(latestSeqNum), 4, 100)
 
-          /* Update the next checkpoint time */
-          checkpointState.advanceCheckpoint()
+            /* Update the next checkpoint time */
+            checkpointState.advanceCheckpoint()
 
-          logDebug(s"Checkpoint:  WorkerId $workerId completed checkpoint of ${batch.size}" +
+            logDebug(s"Checkpoint:  WorkerId $workerId completed checkpoint of ${batch.size}" +
               s" records for shardId $shardId")
-          logDebug(s"Checkpoint:  Next checkpoint is at " +
+            logDebug(s"Checkpoint:  Next checkpoint is at " +
               s" ${checkpointState.checkpointClock.getTimeMillis()} for shardId $shardId")
+          }
         }
       } catch {
-        case e: Throwable => {
+        case NonFatal(e) => {
           /*
            *  If there is a failure within the batch, the batch will not be checkpointed.
            *  This will potentially cause records since the last checkpoint to be processed
@@ -130,7 +118,7 @@ private[kinesis] class KinesisRecordProcessor(
       }
     } else {
       /* RecordProcessor has been stopped. */
-      logInfo(s"Stopped:  The Spark KinesisReceiver has stopped for workerId $workerId" +
+      logInfo(s"Stopped:  KinesisReceiver has stopped for workerId $workerId" +
           s" and shardId $shardId.  No more records will be processed.")
     }
   }
@@ -154,7 +142,11 @@ private[kinesis] class KinesisRecordProcessor(
        * It's now OK to read from the new shards that resulted from a resharding event.
        */
       case ShutdownReason.TERMINATE =>
-        KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+        val latestSeqNumToCheckpointOption = receiver.getLatestSeqNumToCheckpoint(shardId)
+        if (latestSeqNumToCheckpointOption.nonEmpty) {
+          KinesisRecordProcessor.retryRandom(
+            checkpointer.checkpoint(latestSeqNumToCheckpointOption.get), 4, 100)
+        }
 
       /*
        * ZOMBIE Use Case.  NoOp.

http://git-wip-us.apache.org/repos/asf/spark/blob/c2a71f07/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 255ac27..711aade 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -36,22 +36,10 @@ import org.apache.spark.Logging
 /**
  * Shared utility methods for performing Kinesis tests that actually transfer data
  */
-private class KinesisTestUtils(val endpointUrl: String, _regionName: String) extends Logging {
-
-  def this() {
-    this("https://kinesis.us-west-2.amazonaws.com", "")
-  }
-
-  def this(endpointUrl: String) {
-    this(endpointUrl, "")
-  }
-
-  val regionName = if (_regionName.length == 0) {
-    RegionUtils.getRegionByEndpoint(endpointUrl).getName()
-  } else {
-    RegionUtils.getRegion(_regionName).getName()
-  }
+private class KinesisTestUtils extends Logging {
 
+  val endpointUrl = KinesisTestUtils.endpointUrl
+  val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
   val streamShardCount = 2
 
   private val createStreamTimeoutSeconds = 300
@@ -81,11 +69,11 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext
   }
 
   def createStream(): Unit = {
-    logInfo("Creating stream")
     require(!streamCreated, "Stream already created")
     _streamName = findNonExistentStreamName()
 
     // Create a stream. The number of shards determines the provisioned throughput.
+    logInfo(s"Creating stream ${_streamName}")
     val createStreamRequest = new CreateStreamRequest()
     createStreamRequest.setStreamName(_streamName)
     createStreamRequest.setShardCount(2)
@@ -94,7 +82,7 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext
     // The stream is now being created. Wait for it to become active.
     waitForStreamToBeActive(_streamName)
     streamCreated = true
-    logInfo("Created stream")
+    logInfo(s"Created stream ${_streamName}")
   }
 
   /**
@@ -191,9 +179,38 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext
 
 private[kinesis] object KinesisTestUtils {
 
-  val envVarName = "ENABLE_KINESIS_TESTS"
+  val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
+  val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
+  val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
+
+  lazy val shouldRunTests = {
+    val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
+    if (isEnvSet) {
+      // scalastyle:off println
+      // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+      println(
+        s"""
+          |Kinesis tests that actually send data has been enabled by setting the environment
+          |variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
+          |DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
+          |By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
+          |To change this endpoint URL to a different region, you can set the environment variable
+          |$endVarNameForEndpoint to the desired endpoint URL
+          |(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
+        """.stripMargin)
+      // scalastyle:on println
+    }
+    isEnvSet
+  }
 
-  val shouldRunTests = sys.env.get(envVarName) == Some("1")
+  lazy val endpointUrl = {
+    val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
+    // scalastyle:off println
+    // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+    println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
+    // scalastyle:on println
+    url
+  }
 
   def isAWSCredentialsPresent: Boolean = {
     Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
@@ -205,7 +222,13 @@ private[kinesis] object KinesisTestUtils {
     Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
       case Success(cred) => cred
       case Failure(e) =>
-        throw new Exception("Kinesis tests enabled, but could get not AWS credentials")
+        throw new Exception(
+          s"""
+             |Kinesis tests enabled using environment variable $envVarNameForEnablingTests
+             |but could not find AWS credentials. Please follow instructions in AWS documentation
+             |to set the credentials in your system such that the DefaultAWSCredentialsProviderChain
+             |can find the credentials.
+           """.stripMargin)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c2a71f07/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index 7dab17e..c799fad 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -65,9 +65,8 @@ object KinesisUtils {
     ): ReceiverInputDStream[Array[Byte]] = {
     // Setting scope to override receiver stream's scope of "receiver stream"
     ssc.withNamedScope("kinesis stream") {
-      ssc.receiverStream(
-        new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
-          initialPositionInStream, checkpointInterval, storageLevel, None))
+      new KinesisInputDStream(ssc, streamName, endpointUrl, validateRegion(regionName),
+        initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, None)
     }
   }
 
@@ -112,10 +111,11 @@ object KinesisUtils {
       awsAccessKeyId: String,
       awsSecretKey: String
     ): ReceiverInputDStream[Array[Byte]] = {
-    ssc.receiverStream(
-      new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
-        initialPositionInStream, checkpointInterval, storageLevel,
-        Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))))
+    ssc.withNamedScope("kinesis stream") {
+      new KinesisInputDStream(ssc, streamName, endpointUrl, validateRegion(regionName),
+        initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
+        Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey)))
+    }
   }
 
   /**
@@ -155,9 +155,10 @@ object KinesisUtils {
       initialPositionInStream: InitialPositionInStream,
       storageLevel: StorageLevel
     ): ReceiverInputDStream[Array[Byte]] = {
-    ssc.receiverStream(
-      new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl, getRegionByEndpoint(endpointUrl),
-        initialPositionInStream, checkpointInterval, storageLevel, None))
+    ssc.withNamedScope("kinesis stream") {
+      new KinesisInputDStream(ssc, streamName, endpointUrl, getRegionByEndpoint(endpointUrl),
+        initialPositionInStream, ssc.sc.appName, checkpointInterval, storageLevel, None)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c2a71f07/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index e81fb11..a89e562 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -24,8 +24,6 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
 
 class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll {
 
-  private val regionId = "us-east-1"
-  private val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"
   private val testData = 1 to 8
 
   private var testUtils: KinesisTestUtils = null
@@ -42,7 +40,7 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
 
   override def beforeAll(): Unit = {
     runIfTestsEnabled("Prepare KinesisTestUtils") {
-      testUtils = new KinesisTestUtils(endpointUrl)
+      testUtils = new KinesisTestUtils()
       testUtils.createStream()
 
       shardIdToDataAndSeqNumbers = testUtils.pushData(testData)
@@ -75,21 +73,21 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
 
   testIfEnabled("Basic reading from Kinesis") {
     // Verify all data using multiple ranges in a single RDD partition
-    val receivedData1 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
+    val receivedData1 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl,
       fakeBlockIds(1),
       Array(SequenceNumberRanges(allRanges.toArray))
     ).map { bytes => new String(bytes).toInt }.collect()
     assert(receivedData1.toSet === testData.toSet)
 
     // Verify all data using one range in each of the multiple RDD partitions
-    val receivedData2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
+    val receivedData2 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl,
       fakeBlockIds(allRanges.size),
       allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
     ).map { bytes => new String(bytes).toInt }.collect()
     assert(receivedData2.toSet === testData.toSet)
 
     // Verify ordering within each partition
-    val receivedData3 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
+    val receivedData3 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl,
       fakeBlockIds(allRanges.size),
       allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
     ).map { bytes => new String(bytes).toInt }.collectPartitions()
@@ -211,7 +209,8 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
       }, "Incorrect configuration of RDD, unexpected ranges set"
     )
 
-    val rdd = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds, ranges)
+    val rdd = new KinesisBackedBlockRDD(
+      sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges)
     val collectedData = rdd.map { bytes =>
       new String(bytes).toInt
     }.collect()
@@ -224,8 +223,9 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
     if (testIsBlockValid) {
       require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
       require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis")
-      val rdd2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds.toArray,
-        ranges, isBlockIdValid = Array.fill(blockIds.length)(false))
+      val rdd2 = new KinesisBackedBlockRDD(
+        sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, ranges,
+        isBlockIdValid = Array.fill(blockIds.length)(false))
       intercept[SparkException] {
         rdd2.collect()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/c2a71f07/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
index 8373138..ee428f3 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
@@ -31,7 +31,7 @@ trait KinesisFunSuite extends SparkFunSuite  {
     if (shouldRunTests) {
       test(testName)(testBody)
     } else {
-      ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody)
+      ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody)
     }
   }
 
@@ -40,7 +40,7 @@ trait KinesisFunSuite extends SparkFunSuite  {
     if (shouldRunTests) {
       body
     } else {
-      ignore(s"$message [enable by setting env var $envVarName=1]")()
+      ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c2a71f07/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 98f2c7c..ceb135e 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -22,15 +22,14 @@ import scala.collection.JavaConversions.seqAsJavaList
 
 import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
 import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
 import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
 import com.amazonaws.services.kinesis.model.Record
+import org.mockito.Matchers._
 import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfter, Matchers}
 
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase}
+import org.apache.spark.streaming.{Milliseconds, TestSuiteBase}
 import org.apache.spark.util.{Clock, ManualClock, Utils}
 
 /**
@@ -44,6 +43,8 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
   val endpoint = "endpoint-url"
   val workerId = "dummyWorkerId"
   val shardId = "dummyShardId"
+  val seqNum = "dummySeqNum"
+  val someSeqNum = Some(seqNum)
 
   val record1 = new Record()
   record1.setData(ByteBuffer.wrap("Spark In Action".getBytes()))
@@ -80,16 +81,18 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
 
   test("process records including store and checkpoint") {
     when(receiverMock.isStopped()).thenReturn(false)
+    when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
     when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)
 
     val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+    recordProcessor.initialize(shardId)
     recordProcessor.processRecords(batch, checkpointerMock)
 
     verify(receiverMock, times(1)).isStopped()
-    verify(receiverMock, times(1)).store(record1.getData().array())
-    verify(receiverMock, times(1)).store(record2.getData().array())
+    verify(receiverMock, times(1)).addRecords(shardId, batch)
+    verify(receiverMock, times(1)).getLatestSeqNumToCheckpoint(shardId)
     verify(checkpointStateMock, times(1)).shouldCheckpoint()
-    verify(checkpointerMock, times(1)).checkpoint()
+    verify(checkpointerMock, times(1)).checkpoint(anyString)
     verify(checkpointStateMock, times(1)).advanceCheckpoint()
   }
 
@@ -100,19 +103,25 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
     recordProcessor.processRecords(batch, checkpointerMock)
 
     verify(receiverMock, times(1)).isStopped()
+    verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record]))
+    verify(checkpointerMock, never).checkpoint(anyString)
   }
 
   test("shouldn't checkpoint when exception occurs during store") {
     when(receiverMock.isStopped()).thenReturn(false)
-    when(receiverMock.store(record1.getData().array())).thenThrow(new RuntimeException())
+    when(
+      receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
+    ).thenThrow(new RuntimeException())
 
     intercept[RuntimeException] {
       val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+      recordProcessor.initialize(shardId)
       recordProcessor.processRecords(batch, checkpointerMock)
     }
 
     verify(receiverMock, times(1)).isStopped()
-    verify(receiverMock, times(1)).store(record1.getData().array())
+    verify(receiverMock, times(1)).addRecords(shardId, batch)
+    verify(checkpointerMock, never).checkpoint(anyString)
   }
 
   test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
@@ -158,19 +167,25 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
   }
 
   test("shutdown should checkpoint if the reason is TERMINATE") {
+    when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
     val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
-    val reason = ShutdownReason.TERMINATE
-    recordProcessor.shutdown(checkpointerMock, reason)
+    recordProcessor.initialize(shardId)
+    recordProcessor.shutdown(checkpointerMock, ShutdownReason.TERMINATE)
 
-    verify(checkpointerMock, times(1)).checkpoint()
+    verify(receiverMock, times(1)).getLatestSeqNumToCheckpoint(shardId)
+    verify(checkpointerMock, times(1)).checkpoint(anyString)
   }
 
   test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
+    when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
     val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+    recordProcessor.initialize(shardId)
     recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
     recordProcessor.shutdown(checkpointerMock, null)
 
-    verify(checkpointerMock, never()).checkpoint()
+    verify(checkpointerMock, never).checkpoint(anyString)
   }
 
   test("retry success on first attempt") {

http://git-wip-us.apache.org/repos/asf/spark/blob/c2a71f07/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index b88c9c6..1177dc7 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -22,34 +22,67 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.util.Random
 
+import com.amazonaws.regions.RegionUtils
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.scalatest.Matchers._
 import org.scalatest.concurrent.Eventually
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
 import org.apache.spark.streaming._
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.streaming.kinesis.KinesisTestUtils._
+import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
+import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkConf, SparkContext}
 
 class KinesisStreamSuite extends KinesisFunSuite
   with Eventually with BeforeAndAfter with BeforeAndAfterAll {
 
-  // This is the name that KCL uses to save metadata to DynamoDB
-  private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
+  // This is the name that KCL will use to save metadata to DynamoDB
+  private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
+  private val batchDuration = Seconds(1)
 
-  private var ssc: StreamingContext = _
-  private var sc: SparkContext = _
+  // Dummy parameters for API testing
+  private val dummyEndpointUrl = defaultEndpointUrl
+  private val dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName()
+  private val dummyAWSAccessKey = "dummyAccessKey"
+  private val dummyAWSSecretKey = "dummySecretKey"
+
+  private var testUtils: KinesisTestUtils = null
+  private var ssc: StreamingContext = null
+  private var sc: SparkContext = null
 
   override def beforeAll(): Unit = {
     val conf = new SparkConf()
       .setMaster("local[4]")
       .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
     sc = new SparkContext(conf)
+
+    runIfTestsEnabled("Prepare KinesisTestUtils") {
+      testUtils = new KinesisTestUtils()
+      testUtils.createStream()
+    }
   }
 
   override def afterAll(): Unit = {
-    sc.stop()
-    // Delete the Kinesis stream as well as the DynamoDB table generated by
-    // Kinesis Client Library when consuming the stream
+    if (ssc != null) {
+      ssc.stop()
+    }
+    if (sc != null) {
+      sc.stop()
+    }
+    if (testUtils != null) {
+      // Delete the Kinesis stream as well as the DynamoDB table generated by
+      // Kinesis Client Library when consuming the stream
+      testUtils.deleteStream()
+      testUtils.deleteDynamoDBTable(appName)
+    }
+  }
+
+  before {
+    ssc = new StreamingContext(sc, batchDuration)
   }
 
   after {
@@ -57,21 +90,75 @@ class KinesisStreamSuite extends KinesisFunSuite
       ssc.stop(stopSparkContext = false)
       ssc = null
     }
+    if (testUtils != null) {
+      testUtils.deleteDynamoDBTable(appName)
+    }
   }
 
   test("KinesisUtils API") {
-    ssc = new StreamingContext(sc, Seconds(1))
     // Tests the API, does not actually test data receiving
     val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
-      "https://kinesis.us-west-2.amazonaws.com", Seconds(2),
+      dummyEndpointUrl, Seconds(2),
       InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
     val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
-      "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+      dummyEndpointUrl, dummyRegionName,
       InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
     val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
-      "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+      dummyEndpointUrl, dummyRegionName,
       InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
-      "awsAccessKey", "awsSecretKey")
+      dummyAWSAccessKey, dummyAWSSecretKey)
+  }
+
+  test("RDD generation") {
+    val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream",
+      dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2),
+      StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey)
+    assert(inputStream.isInstanceOf[KinesisInputDStream])
+
+    val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream]
+    val time = Time(1000)
+
+    // Generate block info data for testing
+    val seqNumRanges1 = SequenceNumberRanges(
+      SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
+    val blockId1 = StreamBlockId(kinesisStream.id, 123)
+    val blockInfo1 = ReceivedBlockInfo(
+      0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
+
+    val seqNumRanges2 = SequenceNumberRanges(
+      SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
+    val blockId2 = StreamBlockId(kinesisStream.id, 345)
+    val blockInfo2 = ReceivedBlockInfo(
+      0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))
+
+    // Verify that the generated KinesisBackedBlockRDD has the all the right information
+    val blockInfos = Seq(blockInfo1, blockInfo2)
+    val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos)
+    nonEmptyRDD shouldBe a [KinesisBackedBlockRDD]
+    val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD]
+    assert(kinesisRDD.regionName === dummyRegionName)
+    assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
+    assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
+    assert(kinesisRDD.awsCredentialsOption ===
+      Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey)))
+    assert(nonEmptyRDD.partitions.size === blockInfos.size)
+    nonEmptyRDD.partitions.foreach { _ shouldBe a [KinesisBackedBlockRDDPartition] }
+    val partitions = nonEmptyRDD.partitions.map {
+      _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq
+    assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, seqNumRanges2))
+    assert(partitions.map { _.blockId } === Seq(blockId1, blockId2))
+    assert(partitions.forall { _.isBlockIdValid === true })
+
+    // Verify that KinesisBackedBlockRDD is generated even when there are no blocks
+    val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
+    emptyRDD shouldBe a [KinesisBackedBlockRDD]
+    emptyRDD.partitions shouldBe empty
+
+    // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid
+    blockInfos.foreach { _.setBlockIdInvalid() }
+    kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { partition =>
+      assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid === false)
+    }
   }
 
 
@@ -84,32 +171,91 @@ class KinesisStreamSuite extends KinesisFunSuite
    * and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
    */
   testIfEnabled("basic operation") {
-    val kinesisTestUtils = new KinesisTestUtils()
-    try {
-      kinesisTestUtils.createStream()
-      ssc = new StreamingContext(sc, Seconds(1))
-      val awsCredentials = KinesisTestUtils.getAWSCredentials()
-      val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
-        kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
-        Seconds(10), StorageLevel.MEMORY_ONLY,
-        awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
-
-      val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
-      stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
-        collected ++= rdd.collect()
-        logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
-      }
-      ssc.start()
+    val awsCredentials = KinesisTestUtils.getAWSCredentials()
+    val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
+      testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
+      Seconds(10), StorageLevel.MEMORY_ONLY,
+      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
 
-      val testData = 1 to 10
-      eventually(timeout(120 seconds), interval(10 second)) {
-        kinesisTestUtils.pushData(testData)
-        assert(collected === testData.toSet, "\nData received does not match data sent")
+    val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+    stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
+      collected ++= rdd.collect()
+      logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+    }
+    ssc.start()
+
+    val testData = 1 to 10
+    eventually(timeout(120 seconds), interval(10 second)) {
+      testUtils.pushData(testData)
+      assert(collected === testData.toSet, "\nData received does not match data sent")
+    }
+    ssc.stop(stopSparkContext = false)
+  }
+
+  testIfEnabled("failure recovery") {
+    val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+    val checkpointDir = Utils.createTempDir().getAbsolutePath
+
+    ssc = new StreamingContext(sc, Milliseconds(1000))
+    ssc.checkpoint(checkpointDir)
+
+    val awsCredentials = KinesisTestUtils.getAWSCredentials()
+    val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
+      with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
+
+    val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
+      testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
+      Seconds(10), StorageLevel.MEMORY_ONLY,
+      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+
+    // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch
+    kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
+      val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD]
+      val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
+      collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+    })
+
+    ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
+    ssc.start()
+
+    def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty)
+
+    def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
+
+    // Run until there are at least 10 batches with some data in them
+    // If this times out because numBatchesWithData is empty, then its likely that foreachRDD
+    // function failed with exceptions, and nothing got added to `collectedData`
+    eventually(timeout(2 minutes), interval(1 seconds)) {
+      testUtils.pushData(1 to 5)
+      assert(isCheckpointPresent && numBatchesWithData > 10)
+    }
+    ssc.stop(stopSparkContext = true)  // stop the SparkContext so that the blocks are not reused
+
+    // Restart the context from checkpoint and verify whether the
+    logInfo("Restarting from checkpoint")
+    ssc = new StreamingContext(checkpointDir)
+    ssc.start()
+    val recoveredKinesisStream = ssc.graph.getInputStreams().head
+
+    // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
+    // and return the same data
+    val times = collectedData.keySet
+    times.foreach { time =>
+      val (arrayOfSeqNumRanges, data) = collectedData(time)
+      val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
+      rdd shouldBe a [KinesisBackedBlockRDD]
+
+      // Verify the recovered sequence ranges
+      val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD]
+      assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
+      arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
+        assert(expected.ranges.toSeq === found.ranges.toSeq)
       }
-      ssc.stop()
-    } finally {
-      kinesisTestUtils.deleteStream()
-      kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+
+      // Verify the recovered data
+      assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
     }
+    ssc.stop()
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c2a71f07/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 670ef8d..a158009 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -21,12 +21,12 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.BlockId
-import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
 import org.apache.spark.streaming.scheduler.rate.RateEstimator
+import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, RateController, StreamInputInfo}
 import org.apache.spark.streaming.util.WriteAheadLogUtils
+import org.apache.spark.streaming.{StreamingContext, Time}
 
 /**
  * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -79,48 +79,55 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
         // for this batch
         val receiverTracker = ssc.scheduler.receiverTracker
         val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
-        val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
 
         // Register the input blocks information into InputInfoTracker
         val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
         ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
 
-        if (blockInfos.nonEmpty) {
-          // Are WAL record handles present with all the blocks
-          val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
+        // Create the BlockRDD
+        createBlockRDD(validTime, blockInfos)
+      }
+    }
+    Some(blockRDD)
+  }
 
-          if (areWALRecordHandlesPresent) {
-            // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
-            val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
-            val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
-            new WriteAheadLogBackedBlockRDD[T](
-              ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
-          } else {
-            // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
-            // others then that is unexpected and log a warning accordingly.
-            if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
-              if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
-                logError("Some blocks do not have Write Ahead Log information; " +
-                  "this is unexpected and data may not be recoverable after driver failures")
-              } else {
-                logWarning("Some blocks have Write Ahead Log information; this is unexpected")
-              }
-            }
-            new BlockRDD[T](ssc.sc, blockIds)
-          }
-        } else {
-          // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
-          // according to the configuration
+  private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
+
+    if (blockInfos.nonEmpty) {
+      val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
+
+      // Are WAL record handles present with all the blocks
+      val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
+
+      if (areWALRecordHandlesPresent) {
+        // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
+        val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
+        val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
+        new WriteAheadLogBackedBlockRDD[T](
+          ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
+      } else {
+        // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
+        // others then that is unexpected and log a warning accordingly.
+        if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
           if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
-            new WriteAheadLogBackedBlockRDD[T](
-              ssc.sparkContext, Array.empty, Array.empty, Array.empty)
+            logError("Some blocks do not have Write Ahead Log information; " +
+              "this is unexpected and data may not be recoverable after driver failures")
           } else {
-            new BlockRDD[T](ssc.sc, Array.empty)
+            logWarning("Some blocks have Write Ahead Log information; this is unexpected")
           }
         }
+        new BlockRDD[T](ssc.sc, blockIds)
+      }
+    } else {
+      // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
+      // according to the configuration
+      if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
+        new WriteAheadLogBackedBlockRDD[T](
+          ssc.sparkContext, Array.empty, Array.empty, Array.empty)
+      } else {
+        new BlockRDD[T](ssc.sc, Array.empty)
       }
     }
-    Some(blockRDD)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message