spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "kevin fu (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-23991) data loss when allocateBlocksToBatch
Date Mon, 16 Apr 2018 11:35:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

kevin fu updated SPARK-23991:
-----------------------------
    Description: 
with checkpoint and WAL enabled, driver will write the allocation of blocks to batch into
hdfs. however, if it fails as following, the blocks of this batch cannot be computed by the
DAG. Because the blocks have been dequeued from the receivedBlockQueue and get lost.


{panel:title=error log}
18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing record: BatchAllocationEvent(1523765480000
ms,AllocatedBlocks(Map(0 -> ArrayBuffer()))) to the WriteAheadLog. org.apache.spark.SparkException:
Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.util.concurrent.TimeoutException:
Futures timed out after [5000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 more
{panel}


the concerning codes are showed below:


{code:scala}
  /**
   * Allocate all unallocated blocks to the given batch.
   * This event will get written to the write ahead log (if enabled).
   */
  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
      }
    } else {
      // This situation occurs when:
      // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
      // possibly processed batch job or half-processed batch job need to be processed again,
      // so the batchTime will be equal to lastAllocatedBatchTime.
      // 2. Slow checkpointing makes recovered batch time older than WAL recovered
      // lastAllocatedBatchTime.
      // This situation will only occurs in recovery time.
      logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
    }
  }

{code}


  was:
with checkpoint and WAL enabled, driver will write the allocation of blocks to batch into
hdfs. however, if it fails as following, the blocks of this batch cannot be computed by the
DAG. Because the blocks have been dequeued from the receivedBlockQueue and get lost.

{quote}18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing record:
BatchAllocationEvent(1523765480000 ms,AllocatedBlocks(Map(0 -> ArrayBuffer()))) to the
WriteAheadLog. org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.util.concurrent.TimeoutException:
Futures timed out after [5000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 more{quote}

the concerning codes are showed below:

{quote}
  /**
   * Allocate all unallocated blocks to the given batch.
   * This event will get written to the write ahead log (if enabled).
   */
  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
      }
    } else {
      // This situation occurs when:
      // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
      // possibly processed batch job or half-processed batch job need to be processed again,
      // so the batchTime will be equal to lastAllocatedBatchTime.
      // 2. Slow checkpointing makes recovered batch time older than WAL recovered
      // lastAllocatedBatchTime.
      // This situation will only occurs in recovery time.
      logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
    }
  }
{quote}


> data loss when allocateBlocksToBatch
> ------------------------------------
>
>                 Key: SPARK-23991
>                 URL: https://issues.apache.org/jira/browse/SPARK-23991
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Input/Output
>    Affects Versions: 2.2.0
>         Environment: spark 2.11
>            Reporter: kevin fu
>            Priority: Major
>
> with checkpoint and WAL enabled, driver will write the allocation of blocks to batch
into hdfs. however, if it fails as following, the blocks of this batch cannot be computed
by the DAG. Because the blocks have been dequeued from the receivedBlockQueue and get lost.
> {panel:title=error log}
> 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing record: BatchAllocationEvent(1523765480000
ms,AllocatedBlocks(Map(0 -> ArrayBuffer()))) to the WriteAheadLog. org.apache.spark.SparkException:
Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.util.concurrent.TimeoutException:
Futures timed out after [5000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 more
> {panel}
> the concerning codes are showed below:
> {code:scala}
>   /**
>    * Allocate all unallocated blocks to the given batch.
>    * This event will get written to the write ahead log (if enabled).
>    */
>   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
>     if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
>       val streamIdToBlocks = streamIds.map { streamId =>
>           (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
>       }.toMap
>       val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
>       if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
>         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
>         lastAllocatedBatchTime = batchTime
>       } else {
>         logInfo(s"Possibly processed batch $batchTime needs to be processed again in
WAL recovery")
>       }
>     } else {
>       // This situation occurs when:
>       // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
>       // possibly processed batch job or half-processed batch job need to be processed
again,
>       // so the batchTime will be equal to lastAllocatedBatchTime.
>       // 2. Slow checkpointing makes recovered batch time older than WAL recovered
>       // lastAllocatedBatchTime.
>       // This situation will only occurs in recovery time.
>       logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL
recovery")
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message