spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Binzi Cao (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions
Date Tue, 04 Sep 2018 10:49:01 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602889#comment-16602889
] 

Binzi Cao commented on SPARK-24189:
-----------------------------------

It seems I'm hitting a similar issuel. I managed to set the kafka isolation level with

{code:java}
.option("kafka.isolation.level", "read_committed")
{code}

and using 
{code:java}
kafka-client 1.0.0 
{code}
 and I'm seeing this issue: 


{code:java}
[error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure:
Task 17 in stage 0.0 failed 1 times, most recent failure: Lost task 17.0 in stage 0.0 (TID
17, localhost, executor driver): java.util.concurrent.TimeoutException: Cannot fetch record
for offset 145 in 2000 milliseconds
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
[error]         at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
[error]         at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
[error]         at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
[error]         at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152)
[error]         at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143)
[error]         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
[error]         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:205)
[error]         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[error]         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
[error]         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:230)


{code}


So it looks like it is not working with a topic with kafka transactions at all. 

The exception was thrown here:
https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L271-L272

Setting
{code:java}
 failOnDataLoss=false
{code}
 can't fix the issue, as the exception is never caught in the KafkaDataConsumer.scala code.






> Spark Strcutured Streaming not working with the Kafka Transactions
> ------------------------------------------------------------------
>
>                 Key: SPARK-24189
>                 URL: https://issues.apache.org/jira/browse/SPARK-24189
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: bharath kumar avusherla
>            Priority: Major
>
> Was trying to read kafka transactional topic using Spark Structured Streaming 2.3.0 with
the  kafka option isolation-level = "read_committed", but spark reading the data immediately
without waiting for the data in topic to be committed. In spark documentation it was mentioned
as Structured Streaming supports Kafka version 0.10 or higher. I am using below command to
test the scenario.
> val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test-topic")
>  .option("isolation-level","read_committed")
>  .load()
> Can you please let me know if the transactional read is supported in SPark 2.3.0 strcutured
Streaming or am i missing anything.
>  
> Thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message