spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hongyao Zhao (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout
Date Wed, 27 Jul 2016 04:12:20 GMT

     [ https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Hongyao Zhao updated SPARK-16746:
---------------------------------
    Description: 
I wrote a spark streaming program which consume 1000 messages from one topic of Kafka, did
some transformation, and wrote the result back to another topic. But only found 988 messages
in the second topic. I checked log info and confirmed all messages was received by receivers.
But I found a hdfs writing time out message printed from Class BatchedWriteAheadLog. 
    
    I checkout source code and found code like this: 
  
{code:title=code|borderStyle=solid}
    /** Add received block. This event will get written to the write ahead log (if enabled).
*/ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
    try { 
      val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
      if (writeResult) { 
        synchronized { 
          getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo 
        } 
        logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
          s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
      } else { 
        logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving "
+ 
          s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")

      } 
      writeResult 
    } catch { 
      case NonFatal(e) => 
        logError(s"Error adding block $receivedBlockInfo", e) 
        false 
    } 
  } 
{code}
    
    It seems that ReceiverTracker tries to write block info to hdfs, but the write operation
time out, this cause writeToLog function return false, and  this code "getReceivedBlockQueue(receivedBlockInfo.streamId)
+= receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on spark.streaming.receiver.writeAheadLog.enable.

    
   I want to know whether or not this is a designed behaviour. 

  was:
I wrote a spark streaming program which consume 1000 messages from one topic of Kafka, did
some transformation, and wrote the result back to another topic. But only found 988 messages
in the second topic. I checked log info and confirmed all messages was received by receivers.
But I found a hdfs writing time out message printed from Class BatchedWriteAheadLog. 
    
    I checkout source code and found code like this: 
  
{code:title=Bar.scala|borderStyle=solid}
    /** Add received block. This event will get written to the write ahead log (if enabled).
*/ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
    try { 
      val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
      if (writeResult) { 
        synchronized { 
          getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo 
        } 
        logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
          s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
      } else { 
        logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving "
+ 
          s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")

      } 
      writeResult 
    } catch { 
      case NonFatal(e) => 
        logError(s"Error adding block $receivedBlockInfo", e) 
        false 
    } 
  } 
{code}
    
    It seems that ReceiverTracker tries to write block info to hdfs, but the write operation
time out, this cause writeToLog function return false, and  this code "getReceivedBlockQueue(receivedBlockInfo.streamId)
+= receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on spark.streaming.receiver.writeAheadLog.enable.

    
   I want to know whether or not this is a designed behaviour. 


> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-16746
>                 URL: https://issues.apache.org/jira/browse/SPARK-16746
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.1
>            Reporter: Hongyao Zhao
>            Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic of Kafka,
did some transformation, and wrote the result back to another topic. But only found 988 messages
in the second topic. I checked log info and confirmed all messages was received by receivers.
But I found a hdfs writing time out message printed from Class BatchedWriteAheadLog. 
>     
>     I checkout source code and found code like this: 
>   
> {code:title=code|borderStyle=solid}
>     /** Add received block. This event will get written to the write ahead log (if enabled).
*/ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
>     try { 
>       val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
>       if (writeResult) { 
>         synchronized { 
>           getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo 
>         } 
>         logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
>           s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
>       } else { 
>         logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving
" + 
>           s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")

>       } 
>       writeResult 
>     } catch { 
>       case NonFatal(e) => 
>         logError(s"Error adding block $receivedBlockInfo", e) 
>         false 
>     } 
>   } 
> {code}
>     
>     It seems that ReceiverTracker tries to write block info to hdfs, but the write operation
time out, this cause writeToLog function return false, and  this code "getReceivedBlockQueue(receivedBlockInfo.streamId)
+= receivedBlockInfo" is skipped. so the block info is lost. 
>    The spark version I use is 1.6.1 and I did not turn on spark.streaming.receiver.writeAheadLog.enable.

>     
>    I want to know whether or not this is a designed behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message