spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Deepak (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
Date Fri, 09 Mar 2018 18:21:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Deepak updated SPARK-23636:
---------------------------
    Description: 
h2.  
h2. Summary

 

While using the KafkaUtils.createRDD API - we receive below listed error, specifically when
1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an
Array(OffsetRanges)

 

_I've tagged this issue to "Structured Streaming" - as I could not find a more appropriate
component_ 

 
----
h2. Error Faced
{noformat}
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat}
 Stack Trace
{noformat}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage
1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor
16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded
access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:204)
at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat}
 
----
h2. Config Used to simulate the error

A session with : 
 * Executors - 1
 * Cores - 2 or More
 * Kafka Topic - has only 1 partition
 * While fetching - More than one Array of Offset Range , Example 

{noformat}
Array(OffsetRange("kafka_topic",0,608954201,608954202),
OffsetRange("kafka_topic",0,608954202,608954203)
){noformat}
 
----
h2. Was this approach working before?

 

This was working in spark 1.6.2

However, from spark 2.1 onwards - the approach throws exception

 
----
h2. Why are we fetching from kafka as mentioned above.

 

This gives us the capability to establish a connection to Kafka Broker for every spark
executor's core, thus each core can fetch/process its own set of messages based on the specified
(offset ranges).

 

 
----
h2. Sample Code

 
{quote}scala snippet - on versions spark 2.2.0 or 2.1.0

// Bunch of imports

import kafka.serializer.\{DefaultDecoder, StringDecoder}
 import org.apache.avro.generic.GenericRecord
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.serialization._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.\{DataFrame, Row, SQLContext}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.types.\{StringType, StructField, StructType}
 import org.apache.spark.streaming.kafka010._
 import org.apache.spark.streaming.kafka010.KafkaUtils._
{quote}
{quote}// This forces two connections - from a single executor - to topic-partition <kafka_topic-0>.

// And with 2 cores assigned to 1 executor : each core has a task - pulling respective offsets
: OffsetRange("kafka_topic",0,1,2) & OffsetRange("kafka_topic",0,2,3)

val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records

 OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records 
 )

 

// Initiate kafka properties

val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap()

// kafkaParams1.put("key","val") add all the parameters such as broker, topic.... Not listing
every property here.

 

// Create RDD

val rDDConsumerRec: RDD[ConsumerRecord[String, String]] =
 createRDD[String, String](sparkContext
 , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent)

 

// Map Function

val data: RDD[Row] = rDDConsumerRec.map \{ x => Row(x.topic().toString, x.partition().toString,
x.offset().toString, x.timestamp().toString, x.value() ) }

 

// Create a DataFrame

val df = sqlContext.createDataFrame(data, StructType(
 Seq(
 StructField("topic", StringType),
 StructField("partition", StringType),
 StructField("offset", StringType),
 StructField("timestamp", StringType),
 StructField("value", BinaryType)
 )))

 

df.show() //  You will see the error reported.
{quote}
 
----
 
h2. Similar Issue reported earlier, but on a different API

 

A similar issue reported for DirectStream is 

https://issues.apache.org/jira/browse/SPARK-19185

  was:
h2.  
h2. Summary

 

While using the KafkaUtils.createRDD API - we receive below listed error, specifically when
1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an
Array(OffsetRanges)

 

_I've tagged this issue to "Structured Streaming" - as I could not find a more appropriate
component_ 

 
----
h2. Error Faced
{noformat}
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat}
 Stack Trace
{noformat}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage
1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor
16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded
access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:204)
at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat}
 
----
h2. Config Used to simulate the error

A session with : 
 * Executors - 1
 * Cores - 2 or More
 * Kafka Topic - has only 1 partition
 * While fetching - More than one Array of Offset Range , Example 

{noformat}
Array(OffsetRange("kafka_topic",0,608954201,608954202),
OffsetRange("kafka_topic",0,608954202,608954203)
){noformat}
 
----
h2. Was this approach working before?

 

This was working in spark 1.6.2

However, from spark 2.1 onwards - the approach throws exception

 
----
h2. Why are we fetching from kafka as mentioned above.

 

This gives us the capability to establish a connection to Kafka Broker for every spark
executor's core, thus each core can fetch/process its own set of messages based on the specified
(offset ranges).

 

 
----
h2. Sample Code

 
{quote}scala snippet - on versions spark 2.2.0 or 2.1.0

// Bunch of imports

import kafka.serializer.\{DefaultDecoder, StringDecoder}
 import org.apache.avro.generic.GenericRecord
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.serialization._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.\{DataFrame, Row, SQLContext}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.types.\{StringType, StructField, StructType}
 import org.apache.spark.streaming.kafka010._
 import org.apache.spark.streaming.kafka010.KafkaUtils._
{quote}
{quote}// This forces two connections to same broker for the partition specified below.

val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records

 OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records 
 )

 

// Initiate kafka properties

val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap()

// kafkaParams1.put("key","val") add all the parameters such as broker, topic.... Not listing
every property here.

 

// Create RDD

val rDDConsumerRec: RDD[ConsumerRecord[String, String]] =
 createRDD[String, String](sparkContext
 , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent)

 

// Map Function

val data: RDD[Row] = rDDConsumerRec.map \{ x => Row(x.topic().toString, x.partition().toString,
x.offset().toString, x.timestamp().toString, x.value() ) }

 

// Create a DataFrame

val df = sqlContext.createDataFrame(data, StructType(
 Seq(
 StructField("topic", StringType),
 StructField("partition", StringType),
 StructField("offset", StringType),
 StructField("timestamp", StringType),
 StructField("value", BinaryType)
 )))

 

df.show() //  You will see the error reported.
{quote}
 
----
 
h2. Similar Issue reported earlier, but on a different API

 

A similar issue reported for DirectStream is 

https://issues.apache.org/jira/browse/SPARK-19185


> [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException:
KafkaConsumer is not safe for multi-threaded access
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-23636
>                 URL: https://issues.apache.org/jira/browse/SPARK-23636
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.1, 2.2.0
>            Reporter: Deepak
>            Priority: Major
>              Labels: performance
>
> h2.  
> h2. Summary
>  
> While using the KafkaUtils.createRDD API - we receive below listed error, specifically
when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches
an Array(OffsetRanges)
>  
> _I've tagged this issue to "Structured Streaming" - as I could not find a more appropriate
component_ 
>  
> ----
> h2. Error Faced
> {noformat}
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded
access{noformat}
>  Stack Trace
> {noformat}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host,
executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded
access
> at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629)
> at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528)
> at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:204)
> at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat}
>  
> ----
> h2. Config Used to simulate the error
> A session with : 
>  * Executors - 1
>  * Cores - 2 or More
>  * Kafka Topic - has only 1 partition
>  * While fetching - More than one Array of Offset Range , Example 
> {noformat}
> Array(OffsetRange("kafka_topic",0,608954201,608954202),
> OffsetRange("kafka_topic",0,608954202,608954203)
> ){noformat}
>  
> ----
> h2. Was this approach working before?
>  
> This was working in spark 1.6.2
> However, from spark 2.1 onwards - the approach throws exception
>  
> ----
> h2. Why are we fetching from kafka as mentioned above.
>  
> This gives us the capability to establish a connection to Kafka Broker for every spark
executor's core, thus each core can fetch/process its own set of messages based on the specified
(offset ranges).
>  
>  
> ----
> h2. Sample Code
>  
> {quote}scala snippet - on versions spark 2.2.0 or 2.1.0
> // Bunch of imports
> import kafka.serializer.\{DefaultDecoder, StringDecoder}
>  import org.apache.avro.generic.GenericRecord
>  import org.apache.kafka.clients.consumer.ConsumerRecord
>  import org.apache.kafka.common.serialization._
>  import org.apache.spark.rdd.RDD
>  import org.apache.spark.sql.\{DataFrame, Row, SQLContext}
>  import org.apache.spark.sql.Row
>  import org.apache.spark.sql.hive.HiveContext
>  import org.apache.spark.sql.types.\{StringType, StructField, StructType}
>  import org.apache.spark.streaming.kafka010._
>  import org.apache.spark.streaming.kafka010.KafkaUtils._
> {quote}
> {quote}// This forces two connections - from a single executor - to topic-partition <kafka_topic-0>.
> // And with 2 cores assigned to 1 executor : each core has a task - pulling respective
offsets : OffsetRange("kafka_topic",0,1,2) & OffsetRange("kafka_topic",0,2,3)
> val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2
records 
>  OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records 
>  )
>  
> // Initiate kafka properties
> val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap()
> // kafkaParams1.put("key","val") add all the parameters such as broker, topic.... Not
listing every property here.
>  
> // Create RDD
> val rDDConsumerRec: RDD[ConsumerRecord[String, String]] =
>  createRDD[String, String](sparkContext
>  , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent)
>  
> // Map Function
> val data: RDD[Row] = rDDConsumerRec.map \{ x => Row(x.topic().toString, x.partition().toString,
x.offset().toString, x.timestamp().toString, x.value() ) }
>  
> // Create a DataFrame
> val df = sqlContext.createDataFrame(data, StructType(
>  Seq(
>  StructField("topic", StringType),
>  StructField("partition", StringType),
>  StructField("offset", StringType),
>  StructField("timestamp", StringType),
>  StructField("value", BinaryType)
>  )))
>  
> df.show() //  You will see the error reported.
> {quote}
>  
> ----
>  
> h2. Similar Issue reported earlier, but on a different API
>  
> A similar issue reported for DirectStream is 
> https://issues.apache.org/jira/browse/SPARK-19185



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message